Repository: incubator-nifi
Updated Branches:
  refs/heads/develop b7ddf8945 -> 5273a63bf


NIFI-570: Added MongoDB put and get processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/add03e38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/add03e38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/add03e38

Branch: refs/heads/develop
Commit: add03e3893bcb2cabf563f45d929b0f6ab0ffc65
Parents: b7ddf89
Author: Tim Reardon <[email protected]>
Authored: Tue May 19 08:09:02 2015 -0400
Committer: Tim Reardon <[email protected]>
Committed: Tue May 19 08:15:13 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/NOTICE                       |   4 +
 nifi/nifi-assembly/pom.xml                      |   5 +
 .../nifi-mongodb-nar/pom.xml                    |  37 +++
 .../nifi-mongodb-processors/pom.xml             |  98 +++++++
 .../mongodb/AbstractMongoProcessor.java         |  93 +++++++
 .../nifi/processors/mongodb/GetMongo.java       | 184 ++++++++++++++
 .../nifi/processors/mongodb/PutMongo.java       | 217 ++++++++++++++++
 .../org.apache.nifi.processor.Processor         |  16 ++
 .../nifi/processors/mongodb/GetMongoTest.java   | 201 +++++++++++++++
 .../nifi/processors/mongodb/PutMongoTest.java   | 254 +++++++++++++++++++
 .../nifi-mongodb-bundle/pom.xml                 |  35 +++
 nifi/nifi-nar-bundles/pom.xml                   |  17 +-
 nifi/pom.xml                                    |   6 +
 13 files changed, 1159 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/NOTICE b/nifi/nifi-assembly/NOTICE
index 0edfd96..9d89639 100644
--- a/nifi/nifi-assembly/NOTICE
+++ b/nifi/nifi-assembly/NOTICE
@@ -491,6 +491,10 @@ The following binary components are provided under the 
Apache Software License v
       This product includes software developed by
       Saxonica (http://www.saxonica.com/).
 
+  (ASLv2) MongoDB Java Driver
+    The following NOTICE information applies:
+      Copyright (C) 2008-2013 10gen, Inc.
+
   (ASLv2) Parquet MR
     The following NOTICE information applies:
       Parquet MR

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 4a12b25..cfe1de6 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -164,6 +164,11 @@ language governing permissions and limitations under the 
License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mongodb-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-solr-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
new file mode 100644
index 0000000..a138f18
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-mongodb-bundle</artifactId>
+        <version>0.1.1-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-mongodb-nar</artifactId>
+    <version>0.1.1-incubating-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mongodb-processors</artifactId>
+            <version>0.1.1-incubating-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
new file mode 100644
index 0000000..381d7d6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-mongodb-bundle</artifactId>
+        <version>0.1.1-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-mongodb-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>3.0.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.joelittlejohn.embedmongo</groupId>
+                <artifactId>embedmongo-maven-plugin</artifactId>
+                <version>0.1.12</version>
+                <executions>
+                    <execution>
+                        <id>start</id>
+                        <goals>
+                            <goal>start</goal>
+                        </goals>
+                        <phase>test-compile</phase>
+                        <configuration>
+                            
<databaseDirectory>${project.build.directory}/embedmongo/db</databaseDirectory>
+                            <logging>file</logging>
+                            
<logFile>${project.build.directory}/embedmongo.log</logFile>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>stop</id>
+                        <goals>
+                            <goal>stop</goal>
+                        </goals>
+                        <phase>prepare-package</phase>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
new file mode 100644
index 0000000..fae007f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import java.io.IOException;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.Document;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+public abstract class AbstractMongoProcessor extends AbstractProcessor {
+    protected static final PropertyDescriptor URI = new 
PropertyDescriptor.Builder()
+        .name("Mongo URI")
+        .description("MongoURI, typically of the form: 
mongodb://host1[:port1][,host2[:port2],...]")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    protected static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
+        .name("Mongo Database Name")
+        .description("The name of the database to use")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    protected static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
+        .name("Mongo Collection Name")
+        .description("The name of the collection to use")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    protected MongoClient mongoClient;
+
+    @OnScheduled
+    public final void createClient(ProcessContext context) throws IOException {
+        if (mongoClient != null) {
+            closeClient();
+        }
+
+        getLogger().info("Creating MongoClient");
+
+        try {
+            final String uri = context.getProperty(URI).getValue();
+            mongoClient = new MongoClient(new MongoClientURI(uri));
+        } catch (Exception e) {
+            getLogger().error("Failed to schedule PutMongo due to {}", new 
Object[] { e }, e);
+            throw e;
+        }
+    }
+
+    @OnStopped
+    public final void closeClient() {
+        if (mongoClient != null) {
+            getLogger().info("Closing MongoClient");
+            mongoClient.close();
+            mongoClient = null;
+        }
+    }
+
+    protected MongoDatabase getDatabase(final ProcessContext context) {
+        final String databaseName = 
context.getProperty(DATABASE_NAME).getValue();
+        return mongoClient.getDatabase(databaseName);
+    }
+
+    protected MongoCollection<Document> getCollection(final ProcessContext 
context) {
+        final String collectionName = 
context.getProperty(COLLECTION_NAME).getValue();
+        return getDatabase(context).getCollection(collectionName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
new file mode 100644
index 0000000..02ce9cf
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.Document;
+
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+
+@Tags({ "mongodb", "read", "get" })
+@CapabilityDescription("Creates FlowFiles from documents in MongoDB")
+public class GetMongo extends AbstractMongoProcessor {
+    public static final Validator DOCUMENT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                Document.parse(value);
+            } catch (final RuntimeException e) {
+                reason = e.getClass().getName();
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All files are routed to 
success").build();
+
+    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("Query")
+        .description("The selection criteria; must be a valid BSON document; 
if omitted the entire collection will be queried")
+        .required(false)
+        .addValidator(DOCUMENT_VALIDATOR)
+        .build();
+    static final PropertyDescriptor PROJECTION = new 
PropertyDescriptor.Builder()
+        .name("Projection")
+        .description("The fields to be returned from the documents in the 
result set; must be a valid BSON document")
+        .required(false)
+        .addValidator(DOCUMENT_VALIDATOR)
+        .build();
+    static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
+        .name("Sort")
+        .description("The fields by which to sort; must be a valid BSON 
document")
+        .required(false)
+        .addValidator(DOCUMENT_VALIDATOR)
+        .build();
+    static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
+        .name("Limit")
+        .description("The maximum number of elements to return")
+        .required(false)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Batch Size")
+        .description("The number of elements returned from the server in one 
batch")
+        .required(false)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    private final List<PropertyDescriptor> descriptors;
+
+    private final Set<Relationship> relationships;
+
+    public GetMongo() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(URI);
+        descriptors.add(DATABASE_NAME);
+        descriptors.add(COLLECTION_NAME);
+        descriptors.add(QUERY);
+        descriptors.add(PROJECTION);
+        descriptors.add(SORT);
+        descriptors.add(LIMIT);
+        descriptors.add(BATCH_SIZE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final ProcessorLog logger = getLogger();
+
+        final Document query = context.getProperty(QUERY).isSet() ? 
Document.parse(context.getProperty(QUERY).getValue()) : null;
+        final Document projection = context.getProperty(PROJECTION).isSet() ? 
Document.parse(context.getProperty(PROJECTION).getValue()) : null;
+        final Document sort = context.getProperty(SORT).isSet() ? 
Document.parse(context.getProperty(SORT).getValue()) : null;
+
+        final MongoCollection<Document> collection = getCollection(context);
+
+        try {
+            final FindIterable<Document> it = query != null ? 
collection.find(query) : collection.find();
+            if (projection != null) {
+                it.projection(projection);
+            }
+            if (sort != null) {
+                it.sort(sort);
+            }
+            if (context.getProperty(LIMIT).isSet()) {
+                it.limit(context.getProperty(LIMIT).asInteger());
+            }
+            if (context.getProperty(BATCH_SIZE).isSet()) {
+                it.batchSize(context.getProperty(BATCH_SIZE).asInteger());
+            }
+
+            final MongoCursor<Document> cursor = it.iterator();
+            try {
+                FlowFile flowFile = null;
+                while (cursor.hasNext()) {
+                    flowFile = session.create();
+                    flowFile = session.write(flowFile, new 
OutputStreamCallback() {
+                        @Override
+                        public void process(OutputStream out) throws 
IOException {
+                            IOUtils.write(cursor.next().toJson(), out);
+                        }
+                    });
+
+                    session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
+                    session.transfer(flowFile, REL_SUCCESS);
+                }
+
+                session.commit();
+
+            } finally {
+                cursor.close();
+            }
+
+        } catch (final RuntimeException e) {
+            context.yield();
+            session.rollback();
+            logger.error("Failed to execute query {} due to {}", new Object[] 
{ query, e }, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
new file mode 100644
index 0000000..a8bfc3e
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.bson.Document;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.UpdateOptions;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put" })
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
+public class PutMongo extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final String MODE_INSERT = "insert";
+    static final String MODE_UPDATE = "update";
+
+    static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
+    static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
+    static final String WRITE_CONCERN_FSYNCED = "FSYNCED";
+    static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
+    static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = 
"REPLICA_ACKNOWLEDGED";
+    static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
+
+    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+        .name("Mode")
+        .description("Indicates whether the processor should insert or update 
content")
+        .required(true)
+        .allowableValues(MODE_INSERT, MODE_UPDATE)
+        .defaultValue(MODE_INSERT)
+        .build();
+    static final PropertyDescriptor UPSERT = new PropertyDescriptor.Builder()
+        .name("Upsert")
+        .description("When true, inserts a document if no document matches the 
update query criteria; this property is valid only when using update mode, "
+                + "otherwise it is ignored")
+        .required(true)
+        .allowableValues("true", "false")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .defaultValue("false")
+        .build();
+    static final PropertyDescriptor UPDATE_QUERY_KEY = new 
PropertyDescriptor.Builder()
+        .name("Update Query Key")
+        .description("Key name used to build the update query criteria; this 
property is valid only when using update mode, "
+                + "otherwise it is ignored")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("_id")
+        .build();
+    static final PropertyDescriptor WRITE_CONCERN = new 
PropertyDescriptor.Builder()
+        .name("Write Concern")
+        .description("The write concern to use")
+        .required(true)
+        .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, 
WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
+            WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
+        .defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
+        .build();
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final List<PropertyDescriptor> descriptors;
+
+    private final Set<Relationship> relationships;
+
+    public PutMongo() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(URI);
+        descriptors.add(DATABASE_NAME);
+        descriptors.add(COLLECTION_NAME);
+        descriptors.add(MODE);
+        descriptors.add(UPSERT);
+        descriptors.add(UPDATE_QUERY_KEY);
+        descriptors.add(WRITE_CONCERN);
+        descriptors.add(CHARACTER_SET);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final String mode = context.getProperty(MODE).getValue();
+        final WriteConcern writeConcern = getWriteConcern(context);
+
+        final MongoCollection<Document> collection = 
getCollection(context).withWriteConcern(writeConcern);
+
+        try {
+            // Read the contents of the FlowFile into a byte array
+            final byte[] content = new byte[(int) flowFile.getSize()];
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    StreamUtils.fillBuffer(in, content, true);
+                }
+            });
+
+            // parse
+            final Document doc = Document.parse(new String(content, charset));
+
+            if (MODE_INSERT.equalsIgnoreCase(mode)) {
+                collection.insertOne(doc);
+                logger.info("inserted {} into MongoDB", new Object[] { 
flowFile });
+            } else {
+                // update
+                final boolean upsert = context.getProperty(UPSERT).asBoolean();
+                final String updateKey = 
context.getProperty(UPDATE_QUERY_KEY).getValue();
+                final Document query = new Document(updateKey, 
doc.get(updateKey));
+
+                collection.replaceOne(query, doc, new 
UpdateOptions().upsert(upsert));
+                logger.info("updated {} into MongoDB", new Object[] { flowFile 
});
+            }
+
+            session.getProvenanceReporter().send(flowFile, 
context.getProperty(URI).getValue());
+            session.transfer(flowFile, REL_SUCCESS);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.error("Failed to insert {} into MongoDB due to {}", new 
Object[] { flowFile, e }, e);
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    protected WriteConcern getWriteConcern(final ProcessContext context) {
+        final String writeConcernProperty = 
context.getProperty(WRITE_CONCERN).getValue();
+        WriteConcern writeConcern = null;
+        switch (writeConcernProperty) {
+        case WRITE_CONCERN_ACKNOWLEDGED:
+            writeConcern = WriteConcern.ACKNOWLEDGED;
+            break;
+        case WRITE_CONCERN_UNACKNOWLEDGED:
+            writeConcern = WriteConcern.UNACKNOWLEDGED;
+            break;
+        case WRITE_CONCERN_FSYNCED:
+            writeConcern = WriteConcern.FSYNCED;
+            break;
+        case WRITE_CONCERN_JOURNALED:
+            writeConcern = WriteConcern.JOURNALED;
+            break;
+        case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
+            writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
+            break;
+        case WRITE_CONCERN_MAJORITY:
+            writeConcern = WriteConcern.MAJORITY;
+            break;
+        default:
+            writeConcern = WriteConcern.ACKNOWLEDGED;
+        }
+        return writeConcern;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..92e1cf7
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.mongodb.GetMongo
+org.apache.nifi.processors.mongodb.PutMongo

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
new file mode 100644
index 0000000..53f47f2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+
+public class GetMongoTest {
+    private static final String MONGO_URI = "mongodb://localhost";
+    private static final String DB_NAME = 
GetMongoTest.class.getSimpleName().toLowerCase();
+    private static final String COLLECTION_NAME = "test";
+
+    private static final List<Document> DOCUMENTS = Lists.newArrayList(
+        new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 
3),
+        new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 
4),
+        new Document("_id", "doc_3").append("a", 1).append("b", 3)
+    );
+
+    private TestRunner runner;
+    private MongoClient mongoClient;
+
+    @Before
+    public void setup() {
+        runner = TestRunners.newTestRunner(GetMongo.class);
+        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
+        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
+        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
COLLECTION_NAME);
+
+        mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
+
+        MongoCollection<Document> collection = 
mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
+        collection.insertMany(DOCUMENTS);
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+
+        mongoClient.getDatabase(DB_NAME).drop();
+    }
+
+    @Test
+    public void testValidators() {
+        TestRunner runner = TestRunners.newTestRunner(GetMongo.class);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        // missing uri, db, collection
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(3, results.size());
+        Iterator<ValidationResult> it = results.iterator();
+        Assert.assertTrue(it.next().toString().contains("is invalid because 
Mongo URI is required"));
+        Assert.assertTrue(it.next().toString().contains("is invalid because 
Mongo Database Name is required"));
+        Assert.assertTrue(it.next().toString().contains("is invalid because 
Mongo Collection Name is required"));
+
+        // missing query - is ok
+        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
+        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
+        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
COLLECTION_NAME);
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(0, results.size());
+
+        // invalid query
+        runner.setProperty(GetMongo.QUERY, "{a: x,y,z}");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        
Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is 
invalid because org.bson.json.JsonParseException"));
+
+        // invalid projection
+        runner.setProperty(GetMongo.QUERY, "{a: 1}");
+        runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        
Assert.assertTrue(results.iterator().next().toString().matches("'Projection' .* 
is invalid because org.bson.json.JsonParseException"));
+
+        // invalid sort
+        runner.removeProperty(GetMongo.PROJECTION);
+        runner.setProperty(GetMongo.SORT, "{a: x,y,z}");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        Assert.assertTrue(results.iterator().next().toString().matches("'Sort' 
.* is invalid because org.bson.json.JsonParseException"));
+    }
+
+    @Test
+    public void testReadOneDocument() throws Exception {
+        runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
+    }
+
+    @Test
+    public void testReadMultipleDocuments() throws Exception {
+        runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        for (int i=0; i < flowFiles.size(); i++) {
+            flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
+        }
+    }
+
+    @Test
+    public void testProjection() throws Exception {
+        runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
+        runner.setProperty(GetMongo.PROJECTION, "{_id: 0, a: 1}");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        Document expected = new Document("a", 1);
+        flowFiles.get(0).assertContentEquals(expected.toJson());
+    }
+
+    @Test
+    public void testSort() throws Exception {
+        runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
+        runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
+        flowFiles.get(1).assertContentEquals(DOCUMENTS.get(0).toJson());
+        flowFiles.get(2).assertContentEquals(DOCUMENTS.get(1).toJson());
+    }
+
+    @Test
+    public void testLimit() throws Exception {
+        runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
+        runner.setProperty(GetMongo.LIMIT, "1");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
new file mode 100644
index 0000000..d1ba027
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+
+public class PutMongoTest {
+    private static final String MONGO_URI = "mongodb://localhost";
+    private static final String DATABASE_NAME = 
PutMongoTest.class.getSimpleName().toLowerCase();
+    private static final String COLLECTION_NAME = "test";
+
+    private static final List<Document> DOCUMENTS = Lists.newArrayList(
+            new Document("_id", "doc_1").append("a", 1).append("b", 
2).append("c", 3),
+            new Document("_id", "doc_2").append("a", 1).append("b", 
2).append("c", 4),
+            new Document("_id", "doc_3").append("a", 1).append("b", 3)
+        );
+
+    private TestRunner runner;
+    private MongoClient mongoClient;
+    private MongoCollection<Document> collection;
+
+    @Before
+    public void setup() {
+        runner = TestRunners.newTestRunner(PutMongo.class);
+        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
+        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, 
DATABASE_NAME);
+        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
COLLECTION_NAME);
+
+        mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
+
+        collection = 
mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
+    }
+
+    @After
+    public void teardown() {
+        runner = null;
+
+        mongoClient.getDatabase(DATABASE_NAME).drop();
+    }
+
+    private byte[] documentToByteArray(Document doc) {
+        return doc.toJson().getBytes(UTF_8);
+    }
+
+    @Test
+    public void testValidators() {
+        TestRunner runner = TestRunners.newTestRunner(PutMongo.class);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        // missing uri, db, collection
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(3, results.size());
+        Iterator<ValidationResult> it = results.iterator();
+        Assert.assertTrue(it.next().toString().contains("is invalid because 
Mongo URI is required"));
+        Assert.assertTrue(it.next().toString().contains("is invalid because 
Mongo Database Name is required"));
+        Assert.assertTrue(it.next().toString().contains("is invalid because 
Mongo Collection Name is required"));
+
+        // invalid write concern
+        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
+        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, 
DATABASE_NAME);
+        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
COLLECTION_NAME);
+        runner.setProperty(PutMongo.WRITE_CONCERN, "xyz");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        Assert.assertTrue(results.iterator().next().toString().matches("'Write 
Concern' .* is invalid because Given value not found in allowed set .*"));
+
+        // valid write concern
+        runner.setProperty(PutMongo.WRITE_CONCERN, 
PutMongo.WRITE_CONCERN_UNACKNOWLEDGED);
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        results = new HashSet<>();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testInsertOne() throws Exception {
+        Document doc = DOCUMENTS.get(0);
+        byte[] bytes = documentToByteArray(doc);
+
+        runner.enqueue(bytes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
+        out.assertContentEquals(bytes);
+
+        // verify 1 doc inserted into the collection
+        assertEquals(1, collection.count());
+        assertEquals(doc, collection.find().first());
+    }
+
+    @Test
+    public void testInsertMany() throws Exception {
+        for (Document doc : DOCUMENTS) {
+            runner.enqueue(documentToByteArray(doc));
+        }
+        runner.run(3);
+
+        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
+        for (int i=0; i < flowFiles.size(); i++) {
+            flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
+        }
+
+        // verify 3 docs inserted into the collection
+        assertEquals(3, collection.count());
+    }
+
+    @Test
+    public void testInsertWithDuplicateKey() throws Exception {
+        // pre-insert one document
+        collection.insertOne(DOCUMENTS.get(0));
+
+        for (Document doc : DOCUMENTS) {
+            runner.enqueue(documentToByteArray(doc));
+        }
+        runner.run(3);
+
+        // first doc failed, other 2 succeeded
+        runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_FAILURE).get(0);
+        out.assertContentEquals(documentToByteArray(DOCUMENTS.get(0)));
+
+        runner.assertTransferCount(PutMongo.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
+        for (int i=0; i < flowFiles.size(); i++) {
+            flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i+1).toJson());
+        }
+
+        // verify 2 docs inserted into the collection for a total of 3
+        assertEquals(3, collection.count());
+    }
+
+    /**
+     * Verifies that 'update' does not insert if 'upsert' if false.
+     * @see #testUpsert()
+     */
+    @Test
+    public void testUpdateDoesNotInsert() throws Exception {
+        Document doc = DOCUMENTS.get(0);
+        byte[] bytes = documentToByteArray(doc);
+
+        runner.setProperty(PutMongo.MODE, "update");
+        runner.enqueue(bytes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
+        out.assertContentEquals(bytes);
+
+        // nothing was in collection, so nothing to update since upsert 
defaults to false
+        assertEquals(0, collection.count());
+    }
+
+    /**
+     * Verifies that 'update' does insert if 'upsert' is true.
+     * @see #testUpdateDoesNotInsert()
+     */
+    @Test
+    public void testUpsert() throws Exception {
+        Document doc = DOCUMENTS.get(0);
+        byte[] bytes = documentToByteArray(doc);
+
+        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.UPSERT, "true");
+        runner.enqueue(bytes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
+        out.assertContentEquals(bytes);
+
+        // verify 1 doc inserted into the collection
+        assertEquals(1, collection.count());
+        assertEquals(doc, collection.find().first());
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        Document doc = DOCUMENTS.get(0);
+
+        // pre-insert document
+        collection.insertOne(doc);
+
+        // modify the object
+        doc.put("abc", "123");
+        doc.put("xyz", "456");
+        doc.remove("c");
+
+        byte[] bytes = documentToByteArray(doc);
+
+        runner.setProperty(PutMongo.MODE, "update");
+        runner.enqueue(bytes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
+        out.assertContentEquals(bytes);
+
+        assertEquals(1, collection.count());
+        assertEquals(doc, collection.find().first());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml 
b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
new file mode 100644
index 0000000..0a58ff3
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.1.1-incubating-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-mongodb-bundle</artifactId>
+    <version>0.1.1-incubating-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-mongodb-processors</module>
+        <module>nifi-mongodb-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/pom.xml b/nifi/nifi-nar-bundles/pom.xml
index 21d0234..dff098b 100644
--- a/nifi/nifi-nar-bundles/pom.xml
+++ b/nifi/nifi-nar-bundles/pom.xml
@@ -33,13 +33,14 @@
         <module>nifi-standard-services</module>
         <module>nifi-update-attribute-bundle</module>
         <module>nifi-kafka-bundle</module>
-               <module>nifi-kite-bundle</module>
+        <module>nifi-kite-bundle</module>
         <module>nifi-solr-bundle</module>
-               <module>nifi-aws-bundle</module>
-               <module>nifi-social-media-bundle</module>
-               <module>nifi-geo-bundle</module>
-               <module>nifi-hl7-bundle</module>
-               <module>nifi-language-translation-bundle</module>
+        <module>nifi-aws-bundle</module>
+        <module>nifi-social-media-bundle</module>
+        <module>nifi-geo-bundle</module>
+        <module>nifi-hl7-bundle</module>
+        <module>nifi-language-translation-bundle</module>
+        <module>nifi-mongodb-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>
@@ -66,7 +67,7 @@
                 <version>0.1.1-incubating-SNAPSHOT</version>
                 <scope>provided</scope>
             </dependency>
-                       <dependency>
+             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-http-context-map-api</artifactId>
                 <version>0.1.1-incubating-SNAPSHOT</version>
@@ -87,7 +88,7 @@
                 <artifactId>nifi-ssl-context-service</artifactId>
                 <version>0.1.1-incubating-SNAPSHOT</version>
             </dependency>
-                       <dependency>
+            <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-http-context-map</artifactId>
                 <version>0.1.1-incubating-SNAPSHOT</version>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index ebb9aa3..f8d2638 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -750,6 +750,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-mongodb-nar</artifactId>
+                <version>0.1.1-incubating-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-solr-nar</artifactId>
                 <version>0.1.1-incubating-SNAPSHOT</version>
                 <type>nar</type>

Reply via email to