Repository: incubator-nifi Updated Branches: refs/heads/develop b7ddf8945 -> 5273a63bf
NIFI-570: Added MongoDB put and get processors Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/add03e38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/add03e38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/add03e38 Branch: refs/heads/develop Commit: add03e3893bcb2cabf563f45d929b0f6ab0ffc65 Parents: b7ddf89 Author: Tim Reardon <[email protected]> Authored: Tue May 19 08:09:02 2015 -0400 Committer: Tim Reardon <[email protected]> Committed: Tue May 19 08:15:13 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-assembly/NOTICE | 4 + nifi/nifi-assembly/pom.xml | 5 + .../nifi-mongodb-nar/pom.xml | 37 +++ .../nifi-mongodb-processors/pom.xml | 98 +++++++ .../mongodb/AbstractMongoProcessor.java | 93 +++++++ .../nifi/processors/mongodb/GetMongo.java | 184 ++++++++++++++ .../nifi/processors/mongodb/PutMongo.java | 217 ++++++++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ .../nifi/processors/mongodb/GetMongoTest.java | 201 +++++++++++++++ .../nifi/processors/mongodb/PutMongoTest.java | 254 +++++++++++++++++++ .../nifi-mongodb-bundle/pom.xml | 35 +++ nifi/nifi-nar-bundles/pom.xml | 17 +- nifi/pom.xml | 6 + 13 files changed, 1159 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-assembly/NOTICE ---------------------------------------------------------------------- diff --git a/nifi/nifi-assembly/NOTICE b/nifi/nifi-assembly/NOTICE index 0edfd96..9d89639 100644 --- a/nifi/nifi-assembly/NOTICE +++ b/nifi/nifi-assembly/NOTICE @@ -491,6 +491,10 @@ The following binary components are provided under the Apache Software License v This product includes software developed by Saxonica (http://www.saxonica.com/). + (ASLv2) MongoDB Java Driver + The following NOTICE information applies: + Copyright (C) 2008-2013 10gen, Inc. + (ASLv2) Parquet MR The following NOTICE information applies: Parquet MR http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 4a12b25..cfe1de6 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -164,6 +164,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-solr-nar</artifactId> <type>nar</type> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml new file mode 100644 index 0000000..a138f18 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-bundle</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-mongodb-nar</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-processors</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml new file mode 100644 index 0000000..381d7d6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-bundle</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + </parent> + + <artifactId>nifi-mongodb-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>3.0.1</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.github.joelittlejohn.embedmongo</groupId> + <artifactId>embedmongo-maven-plugin</artifactId> + <version>0.1.12</version> + <executions> + <execution> + <id>start</id> + <goals> + <goal>start</goal> + </goals> + <phase>test-compile</phase> + <configuration> + <databaseDirectory>${project.build.directory}/embedmongo/db</databaseDirectory> + <logging>file</logging> + <logFile>${project.build.directory}/embedmongo.log</logFile> + </configuration> + </execution> + <execution> + <id>stop</id> + <goals> + <goal>stop</goal> + </goals> + <phase>prepare-package</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java new file mode 100644 index 0000000..fae007f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.mongodb; + +import java.io.IOException; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.bson.Document; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +public abstract class AbstractMongoProcessor extends AbstractProcessor { + protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() + .name("Mongo URI") + .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Mongo Database Name") + .description("The name of the database to use") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("Mongo Collection Name") + .description("The name of the collection to use") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected MongoClient mongoClient; + + @OnScheduled + public final void createClient(ProcessContext context) throws IOException { + if (mongoClient != null) { + closeClient(); + } + + getLogger().info("Creating MongoClient"); + + try { + final String uri = context.getProperty(URI).getValue(); + mongoClient = new MongoClient(new MongoClientURI(uri)); + } catch (Exception e) { + getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e); + throw e; + } + } + + @OnStopped + public final void closeClient() { + if (mongoClient != null) { + getLogger().info("Closing MongoClient"); + mongoClient.close(); + mongoClient = null; + } + } + + protected MongoDatabase getDatabase(final ProcessContext context) { + final String databaseName = context.getProperty(DATABASE_NAME).getValue(); + return mongoClient.getDatabase(databaseName); + } + + protected MongoCollection<Document> getCollection(final ProcessContext context) { + final String collectionName = context.getProperty(COLLECTION_NAME).getValue(); + return getDatabase(context).getCollection(collectionName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java new file mode 100644 index 0000000..02ce9cf --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.mongodb; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.bson.Document; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; + +@Tags({ "mongodb", "read", "get" }) +@CapabilityDescription("Creates FlowFiles from documents in MongoDB") +public class GetMongo extends AbstractMongoProcessor { + public static final Validator DOCUMENT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + Document.parse(value); + } catch (final RuntimeException e) { + reason = e.getClass().getName(); + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + + static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Query") + .description("The selection criteria; must be a valid BSON document; if omitted the entire collection will be queried") + .required(false) + .addValidator(DOCUMENT_VALIDATOR) + .build(); + static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() + .name("Projection") + .description("The fields to be returned from the documents in the result set; must be a valid BSON document") + .required(false) + .addValidator(DOCUMENT_VALIDATOR) + .build(); + static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() + .name("Sort") + .description("The fields by which to sort; must be a valid BSON document") + .required(false) + .addValidator(DOCUMENT_VALIDATOR) + .build(); + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("Limit") + .description("The maximum number of elements to return") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of elements returned from the server in one batch") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private final List<PropertyDescriptor> descriptors; + + private final Set<Relationship> relationships; + + public GetMongo() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(URI); + descriptors.add(DATABASE_NAME); + descriptors.add(COLLECTION_NAME); + descriptors.add(QUERY); + descriptors.add(PROJECTION); + descriptors.add(SORT); + descriptors.add(LIMIT); + descriptors.add(BATCH_SIZE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + + final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null; + final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null; + final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null; + + final MongoCollection<Document> collection = getCollection(context); + + try { + final FindIterable<Document> it = query != null ? collection.find(query) : collection.find(); + if (projection != null) { + it.projection(projection); + } + if (sort != null) { + it.sort(sort); + } + if (context.getProperty(LIMIT).isSet()) { + it.limit(context.getProperty(LIMIT).asInteger()); + } + if (context.getProperty(BATCH_SIZE).isSet()) { + it.batchSize(context.getProperty(BATCH_SIZE).asInteger()); + } + + final MongoCursor<Document> cursor = it.iterator(); + try { + FlowFile flowFile = null; + while (cursor.hasNext()) { + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + IOUtils.write(cursor.next().toJson(), out); + } + }); + + session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + } + + session.commit(); + + } finally { + cursor.close(); + } + + } catch (final RuntimeException e) { + context.yield(); + session.rollback(); + logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java new file mode 100644 index 0000000..a8bfc3e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.bson.Document; + +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.UpdateOptions; + +@EventDriven +@Tags({ "mongodb", "insert", "update", "write", "put" }) +@CapabilityDescription("Writes the contents of a FlowFile to MongoDB") +public class PutMongo extends AbstractMongoProcessor { + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to MongoDB are routed to this relationship").build(); + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build(); + + static final String MODE_INSERT = "insert"; + static final String MODE_UPDATE = "update"; + + static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; + static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED"; + static final String WRITE_CONCERN_FSYNCED = "FSYNCED"; + static final String WRITE_CONCERN_JOURNALED = "JOURNALED"; + static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; + static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; + + static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Indicates whether the processor should insert or update content") + .required(true) + .allowableValues(MODE_INSERT, MODE_UPDATE) + .defaultValue(MODE_INSERT) + .build(); + static final PropertyDescriptor UPSERT = new PropertyDescriptor.Builder() + .name("Upsert") + .description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, " + + "otherwise it is ignored") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder() + .name("Update Query Key") + .description("Key name used to build the update query criteria; this property is valid only when using update mode, " + + "otherwise it is ignored") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("_id") + .build(); + static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() + .name("Write Concern") + .description("The write concern to use") + .required(true) + .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, + WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) + .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) + .build(); + static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set in which the data is encoded") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + private final List<PropertyDescriptor> descriptors; + + private final Set<Relationship> relationships; + + public PutMongo() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(URI); + descriptors.add(DATABASE_NAME); + descriptors.add(COLLECTION_NAME); + descriptors.add(MODE); + descriptors.add(UPSERT); + descriptors.add(UPDATE_QUERY_KEY); + descriptors.add(WRITE_CONCERN); + descriptors.add(CHARACTER_SET); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + final String mode = context.getProperty(MODE).getValue(); + final WriteConcern writeConcern = getWriteConcern(context); + + final MongoCollection<Document> collection = getCollection(context).withWriteConcern(writeConcern); + + try { + // Read the contents of the FlowFile into a byte array + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); + + // parse + final Document doc = Document.parse(new String(content, charset)); + + if (MODE_INSERT.equalsIgnoreCase(mode)) { + collection.insertOne(doc); + logger.info("inserted {} into MongoDB", new Object[] { flowFile }); + } else { + // update + final boolean upsert = context.getProperty(UPSERT).asBoolean(); + final String updateKey = context.getProperty(UPDATE_QUERY_KEY).getValue(); + final Document query = new Document(updateKey, doc.get(updateKey)); + + collection.replaceOne(query, doc, new UpdateOptions().upsert(upsert)); + logger.info("updated {} into MongoDB", new Object[] { flowFile }); + } + + session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + + } catch (Exception e) { + e.printStackTrace(); + logger.error("Failed to insert {} into MongoDB due to {}", new Object[] { flowFile, e }, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + protected WriteConcern getWriteConcern(final ProcessContext context) { + final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); + WriteConcern writeConcern = null; + switch (writeConcernProperty) { + case WRITE_CONCERN_ACKNOWLEDGED: + writeConcern = WriteConcern.ACKNOWLEDGED; + break; + case WRITE_CONCERN_UNACKNOWLEDGED: + writeConcern = WriteConcern.UNACKNOWLEDGED; + break; + case WRITE_CONCERN_FSYNCED: + writeConcern = WriteConcern.FSYNCED; + break; + case WRITE_CONCERN_JOURNALED: + writeConcern = WriteConcern.JOURNALED; + break; + case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: + writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED; + break; + case WRITE_CONCERN_MAJORITY: + writeConcern = WriteConcern.MAJORITY; + break; + default: + writeConcern = WriteConcern.ACKNOWLEDGED; + } + return writeConcern; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..92e1cf7 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.mongodb.GetMongo +org.apache.nifi.processors.mongodb.PutMongo http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java new file mode 100644 index 0000000..53f47f2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.mongodb; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; + +public class GetMongoTest { + private static final String MONGO_URI = "mongodb://localhost"; + private static final String DB_NAME = GetMongoTest.class.getSimpleName().toLowerCase(); + private static final String COLLECTION_NAME = "test"; + + private static final List<Document> DOCUMENTS = Lists.newArrayList( + new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), + new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4), + new Document("_id", "doc_3").append("a", 1).append("b", 3) + ); + + private TestRunner runner; + private MongoClient mongoClient; + + @Before + public void setup() { + runner = TestRunners.newTestRunner(GetMongo.class); + runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + + mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); + + MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME); + collection.insertMany(DOCUMENTS); + } + + @After + public void teardown() { + runner = null; + + mongoClient.getDatabase(DB_NAME).drop(); + } + + @Test + public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(GetMongo.class); + Collection<ValidationResult> results; + ProcessContext pc; + + // missing uri, db, collection + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(3, results.size()); + Iterator<ValidationResult> it = results.iterator(); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); + + // missing query - is ok + runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + + // invalid query + runner.setProperty(GetMongo.QUERY, "{a: x,y,z}"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException")); + + // invalid projection + runner.setProperty(GetMongo.QUERY, "{a: 1}"); + runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.iterator().next().toString().matches("'Projection' .* is invalid because org.bson.json.JsonParseException")); + + // invalid sort + runner.removeProperty(GetMongo.PROJECTION); + runner.setProperty(GetMongo.SORT, "{a: x,y,z}"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.iterator().next().toString().matches("'Sort' .* is invalid because org.bson.json.JsonParseException")); + } + + @Test + public void testReadOneDocument() throws Exception { + runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}"); + runner.run(); + + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson()); + } + + @Test + public void testReadMultipleDocuments() throws Exception { + runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); + runner.run(); + + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + for (int i=0; i < flowFiles.size(); i++) { + flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson()); + } + } + + @Test + public void testProjection() throws Exception { + runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}"); + runner.setProperty(GetMongo.PROJECTION, "{_id: 0, a: 1}"); + runner.run(); + + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + Document expected = new Document("a", 1); + flowFiles.get(0).assertContentEquals(expected.toJson()); + } + + @Test + public void testSort() throws Exception { + runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); + runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}"); + runner.run(); + + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson()); + flowFiles.get(1).assertContentEquals(DOCUMENTS.get(0).toJson()); + flowFiles.get(2).assertContentEquals(DOCUMENTS.get(1).toJson()); + } + + @Test + public void testLimit() throws Exception { + runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); + runner.setProperty(GetMongo.LIMIT, "1"); + runner.run(); + + runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); + flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java new file mode 100644 index 0000000..d1ba027 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; + +public class PutMongoTest { + private static final String MONGO_URI = "mongodb://localhost"; + private static final String DATABASE_NAME = PutMongoTest.class.getSimpleName().toLowerCase(); + private static final String COLLECTION_NAME = "test"; + + private static final List<Document> DOCUMENTS = Lists.newArrayList( + new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), + new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4), + new Document("_id", "doc_3").append("a", 1).append("b", 3) + ); + + private TestRunner runner; + private MongoClient mongoClient; + private MongoCollection<Document> collection; + + @Before + public void setup() { + runner = TestRunners.newTestRunner(PutMongo.class); + runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + + mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); + + collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME); + } + + @After + public void teardown() { + runner = null; + + mongoClient.getDatabase(DATABASE_NAME).drop(); + } + + private byte[] documentToByteArray(Document doc) { + return doc.toJson().getBytes(UTF_8); + } + + @Test + public void testValidators() { + TestRunner runner = TestRunners.newTestRunner(PutMongo.class); + Collection<ValidationResult> results; + ProcessContext pc; + + // missing uri, db, collection + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(3, results.size()); + Iterator<ValidationResult> it = results.iterator(); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); + + // invalid write concern + runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.setProperty(PutMongo.WRITE_CONCERN, "xyz"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*")); + + // valid write concern + runner.setProperty(PutMongo.WRITE_CONCERN, PutMongo.WRITE_CONCERN_UNACKNOWLEDGED); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + } + + @Test + public void testInsertOne() throws Exception { + Document doc = DOCUMENTS.get(0); + byte[] bytes = documentToByteArray(doc); + + runner.enqueue(bytes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0); + out.assertContentEquals(bytes); + + // verify 1 doc inserted into the collection + assertEquals(1, collection.count()); + assertEquals(doc, collection.find().first()); + } + + @Test + public void testInsertMany() throws Exception { + for (Document doc : DOCUMENTS) { + runner.enqueue(documentToByteArray(doc)); + } + runner.run(3); + + runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 3); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS); + for (int i=0; i < flowFiles.size(); i++) { + flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson()); + } + + // verify 3 docs inserted into the collection + assertEquals(3, collection.count()); + } + + @Test + public void testInsertWithDuplicateKey() throws Exception { + // pre-insert one document + collection.insertOne(DOCUMENTS.get(0)); + + for (Document doc : DOCUMENTS) { + runner.enqueue(documentToByteArray(doc)); + } + runner.run(3); + + // first doc failed, other 2 succeeded + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_FAILURE).get(0); + out.assertContentEquals(documentToByteArray(DOCUMENTS.get(0))); + + runner.assertTransferCount(PutMongo.REL_SUCCESS, 2); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS); + for (int i=0; i < flowFiles.size(); i++) { + flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i+1).toJson()); + } + + // verify 2 docs inserted into the collection for a total of 3 + assertEquals(3, collection.count()); + } + + /** + * Verifies that 'update' does not insert if 'upsert' if false. + * @see #testUpsert() + */ + @Test + public void testUpdateDoesNotInsert() throws Exception { + Document doc = DOCUMENTS.get(0); + byte[] bytes = documentToByteArray(doc); + + runner.setProperty(PutMongo.MODE, "update"); + runner.enqueue(bytes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0); + out.assertContentEquals(bytes); + + // nothing was in collection, so nothing to update since upsert defaults to false + assertEquals(0, collection.count()); + } + + /** + * Verifies that 'update' does insert if 'upsert' is true. + * @see #testUpdateDoesNotInsert() + */ + @Test + public void testUpsert() throws Exception { + Document doc = DOCUMENTS.get(0); + byte[] bytes = documentToByteArray(doc); + + runner.setProperty(PutMongo.MODE, "update"); + runner.setProperty(PutMongo.UPSERT, "true"); + runner.enqueue(bytes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0); + out.assertContentEquals(bytes); + + // verify 1 doc inserted into the collection + assertEquals(1, collection.count()); + assertEquals(doc, collection.find().first()); + } + + @Test + public void testUpdate() throws Exception { + Document doc = DOCUMENTS.get(0); + + // pre-insert document + collection.insertOne(doc); + + // modify the object + doc.put("abc", "123"); + doc.put("xyz", "456"); + doc.remove("c"); + + byte[] bytes = documentToByteArray(doc); + + runner.setProperty(PutMongo.MODE, "update"); + runner.enqueue(bytes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0); + out.assertContentEquals(bytes); + + assertEquals(1, collection.count()); + assertEquals(doc, collection.find().first()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml new file mode 100644 index 0000000..0a58ff3 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-bundle</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-mongodb-processors</module> + <module>nifi-mongodb-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/pom.xml b/nifi/nifi-nar-bundles/pom.xml index 21d0234..dff098b 100644 --- a/nifi/nifi-nar-bundles/pom.xml +++ b/nifi/nifi-nar-bundles/pom.xml @@ -33,13 +33,14 @@ <module>nifi-standard-services</module> <module>nifi-update-attribute-bundle</module> <module>nifi-kafka-bundle</module> - <module>nifi-kite-bundle</module> + <module>nifi-kite-bundle</module> <module>nifi-solr-bundle</module> - <module>nifi-aws-bundle</module> - <module>nifi-social-media-bundle</module> - <module>nifi-geo-bundle</module> - <module>nifi-hl7-bundle</module> - <module>nifi-language-translation-bundle</module> + <module>nifi-aws-bundle</module> + <module>nifi-social-media-bundle</module> + <module>nifi-geo-bundle</module> + <module>nifi-hl7-bundle</module> + <module>nifi-language-translation-bundle</module> + <module>nifi-mongodb-bundle</module> </modules> <dependencyManagement> <dependencies> @@ -66,7 +67,7 @@ <version>0.1.1-incubating-SNAPSHOT</version> <scope>provided</scope> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-http-context-map-api</artifactId> <version>0.1.1-incubating-SNAPSHOT</version> @@ -87,7 +88,7 @@ <artifactId>nifi-ssl-context-service</artifactId> <version>0.1.1-incubating-SNAPSHOT</version> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-http-context-map</artifactId> <version>0.1.1-incubating-SNAPSHOT</version> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/add03e38/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index ebb9aa3..f8d2638 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -750,6 +750,12 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-nar</artifactId> + <version>0.1.1-incubating-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-solr-nar</artifactId> <version>0.1.1-incubating-SNAPSHOT</version> <type>nar</type>
