Repository: nifi Updated Branches: refs/heads/master e34d653ba -> e603c486f
NIFI-5333 Added GetMongoRecord. Signed-off-by: zenfenan <[email protected]> This closes #3011 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e603c486 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e603c486 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e603c486 Branch: refs/heads/master Commit: e603c486f49f058d5f37892d172c1d6dddff55c7 Parents: e34d653 Author: Mike Thomsen <[email protected]> Authored: Sun Sep 2 15:58:33 2018 -0400 Committer: zenfenan <[email protected]> Committed: Fri Nov 23 15:01:45 2018 +0530 ---------------------------------------------------------------------- .../record/util/DataTypeUtils.java | 5 + .../nifi-mongodb-processors/pom.xml | 1 + .../mongodb/AbstractMongoQueryProcessor.java | 150 ++++++++++++++ .../nifi/processors/mongodb/GetMongo.java | 113 +--------- .../nifi/processors/mongodb/GetMongoRecord.java | 205 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 59 ++++++ .../processors/mongodb/GetMongoRecordIT.groovy | 179 ++++++++++++++++ 8 files changed, 602 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 336a70d..f206a64 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -602,6 +602,11 @@ public class DataTypeUtils { return null; } + if (value instanceof java.util.Date) { + java.util.Date _temp = (java.util.Date)value; + return new Date(_temp.getTime()); + } + if (value instanceof Date) { return (Date) value; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml index 576fb1a..2a2ae87 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -101,6 +101,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mongodb-client-service-api</artifactId> + <version>1.9.0-SNAPSHOT</version> <scope>compile</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java new file mode 100644 index 0000000..6660551 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoQueryProcessor.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.client.MongoCollection; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.JsonValidator; +import org.apache.nifi.processor.util.StandardValidators; +import org.bson.Document; + +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +public abstract class AbstractMongoQueryProcessor extends AbstractMongoProcessor { + public static final String DB_NAME = "mongo.database.name"; + public static final String COL_NAME = "mongo.collection.name"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that have the results of a successful query execution go here.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All input FlowFiles that are part of a failed query execution go here.") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("All input FlowFiles that are part of a successful query execution go here.") + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Query") + .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" + + " an incoming connection from another processor to provide the query as a valid JSON document inside of " + + "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + + "that will result in a full collection fetch using a \"{}\" query.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(JsonValidator.INSTANCE) + .build(); + + public static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() + .name("Projection") + .description("The fields to be returned from the documents in the result set; must be a valid BSON document") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(JsonValidator.INSTANCE) + .build(); + + public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() + .name("Sort") + .description("The fields by which to sort; must be a valid BSON document") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(JsonValidator.INSTANCE) + .build(); + + public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("Limit") + .description("The maximum number of elements to return") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of elements to be returned from the server in one batch") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() + .name("results-per-flowfile") + .displayName("Results Per FlowFile") + .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + protected Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) { + Document query = null; + if (context.getProperty(QUERY).isSet()) { + query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue()); + } else if (!context.getProperty(QUERY).isSet() && input == null) { + query = Document.parse("{}"); + } else { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + query = Document.parse(new String(out.toByteArray())); + } catch (Exception ex) { + getLogger().error("Error reading FlowFile : ", ex); + if (input != null) { //Likely culprit is a bad query + session.transfer(input, REL_FAILURE); + session.commit(); + } else { + throw new ProcessException(ex); + } + } + } + + return query; + } + + protected Map<String, String> getAttributes(ProcessContext context, FlowFile input, Document query, MongoCollection collection) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + + if (context.getProperty(QUERY_ATTRIBUTE).isSet()) { + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue(); + attributes.put(queryAttr, query.toJson()); + } + + attributes.put(DB_NAME, collection.getNamespace().getDatabaseName()); + attributes.put(COL_NAME, collection.getNamespace().getCollectionName()); + + return attributes; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index b8d2f8d..ff65f86 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -32,25 +32,19 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.JsonValidator; -import org.apache.nifi.processor.util.StandardValidators; import org.bson.Document; import org.bson.json.JsonWriterSettings; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -63,75 +57,8 @@ import java.util.Set; @WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."), @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.") }) -public class GetMongo extends AbstractMongoProcessor { - static final String DB_NAME = "mongo.database.name"; - static final String COL_NAME = "mongo.collection.name"; +public class GetMongo extends AbstractMongoQueryProcessor { - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles that have the results of a successful query execution go here.") - .build(); - - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("All input FlowFiles that are part of a failed query execution go here.") - .build(); - - static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("All input FlowFiles that are part of a successful query execution go here.") - .build(); - - static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() - .name("Query") - .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" + - " an incoming connection from another processor to provide the query as a valid JSON document inside of " + - "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + - "that will result in a full collection fetch using a \"{}\" query.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(JsonValidator.INSTANCE) - .build(); - - static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() - .name("Projection") - .description("The fields to be returned from the documents in the result set; must be a valid BSON document") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(JsonValidator.INSTANCE) - .build(); - - static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() - .name("Sort") - .description("The fields by which to sort; must be a valid BSON document") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(JsonValidator.INSTANCE) - .build(); - - static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() - .name("Limit") - .description("The maximum number of elements to return") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); - - static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The number of elements to be returned from the server in one batch") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); - static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() - .name("results-per-flowfile") - .displayName("Results Per FlowFile") - .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); static final AllowableValue YES_PP = new AllowableValue("true", "True"); static final AllowableValue NO_PP = new AllowableValue("false", "False"); @@ -231,14 +158,7 @@ public class GetMongo extends AbstractMongoProcessor { final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue(); final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue()); - final Map<String, String> attributes = new HashMap<>(); - - attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); - if (context.getProperty(QUERY_ATTRIBUTE).isSet()) { - final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue(); - attributes.put(queryAttr, query.toJson()); - } final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null; @@ -250,9 +170,7 @@ public class GetMongo extends AbstractMongoProcessor { final MongoCollection<Document> collection = getCollection(context, input); final FindIterable<Document> it = collection.find(query); - - attributes.put(DB_NAME, collection.getNamespace().getDatabaseName()); - attributes.put(COL_NAME, collection.getNamespace().getCollectionName()); + final Map<String, String> attributes = getAttributes(context, input, query, collection); if (projection != null) { it.projection(projection); @@ -319,31 +237,4 @@ public class GetMongo extends AbstractMongoProcessor { } } - - private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) { - Document query = null; - if (context.getProperty(QUERY).isSet()) { - query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue()); - } else if (!context.getProperty(QUERY).isSet() && input == null) { - query = Document.parse("{}"); - } else { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - session.exportTo(input, out); - out.close(); - query = Document.parse(new String(out.toByteArray())); - } catch (Exception ex) { - logger.error("Error reading FlowFile : ", ex); - if (input != null) { //Likely culprit is a bad query - session.transfer(input, REL_FAILURE); - session.commit(); - } else { - throw new ProcessException(ex); - } - } - } - - return query; - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java new file mode 100644 index 0000000..49878ba --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongoRecord.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.bson.Document; +import org.bson.types.ObjectId; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@CapabilityDescription("A record-based version of GetMongo that uses the Record writers to write the MongoDB result set.") +@Tags({"mongo", "mongodb", "get", "fetch", "record", "json"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@WritesAttributes({ + @WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."), + @WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.") +}) +public class GetMongoRecord extends AbstractMongoQueryProcessor { + public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("get-mongo-record-writer-factory") + .displayName("Record Writer") + .description("The record writer to use to write the result sets.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("mongodb-schema-name") + .displayName("Schema Name") + .description("The name of the schema in the configured schema registry to use for the query results.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("${schema.name}") + .required(true) + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS; + private static final Set<Relationship> RELATIONSHIPS; + + static { + List<PropertyDescriptor> _temp = new ArrayList<>(); + _temp.add(CLIENT_SERVICE); + _temp.add(WRITER_FACTORY); + _temp.add(DATABASE_NAME); + _temp.add(COLLECTION_NAME); + _temp.add(SCHEMA_NAME); + _temp.add(QUERY_ATTRIBUTE); + _temp.add(QUERY); + _temp.add(PROJECTION); + _temp.add(SORT); + _temp.add(LIMIT); + _temp.add(BATCH_SIZE); + + DESCRIPTORS = Collections.unmodifiableList(_temp); + + Set<Relationship> _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_ORIGINAL); + RELATIONSHIPS = Collections.unmodifiableSet(_rels); + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private volatile MongoDBClientService clientService; + private volatile RecordSetWriterFactory writerFactory; + + @OnScheduled + public void onEnabled(ProcessContext context) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); + writerFactory = context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = null; + + if (context.hasIncomingConnection()) { + input = session.get(); + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + final String database = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(input).getValue(); + final String collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(input).getValue(); + final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue(); + final Document query = getQuery(context, session, input); + + MongoCollection mongoCollection = clientService.getDatabase(database).getCollection(collection); + + FindIterable<Document> find = mongoCollection.find(query); + if (context.getProperty(SORT).isSet()) { + find = find.sort(Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue())); + } + if (context.getProperty(PROJECTION).isSet()) { + find = find.projection(Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue())); + } + if (context.getProperty(LIMIT).isSet()) { + find = find.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger()); + } + + MongoCursor<Document> cursor = find.iterator(); + + FlowFile output = input != null ? session.create(input) : session.create(); + final FlowFile inputPtr = input; + try { + final Map<String, String> attributes = getAttributes(context, input, query, mongoCollection); + try (OutputStream out = session.write(output)) { + Map<String, String> attrs = inputPtr != null ? inputPtr.getAttributes() : new HashMap<String, String>(){{ + put("schema.name", schemaName); + }}; + RecordSchema schema = writerFactory.getSchema(attrs, null); + RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out); + long count = 0L; + writer.beginRecordSet(); + while (cursor.hasNext()) { + Document next = cursor.next(); + if (next.get("_id") instanceof ObjectId) { + next.put("_id", next.get("_id").toString()); + } + Record record = new MapRecord(schema, next); + writer.write(record); + count++; + } + writer.finishRecordSet(); + writer.close(); + out.close(); + attributes.put("record.count", String.valueOf(count)); + } catch (SchemaNotFoundException e) { + throw new RuntimeException(e); + } + + + output = session.putAllAttributes(output, attributes); + + session.getProvenanceReporter().fetch(output, getURI(context)); + session.transfer(output, REL_SUCCESS); + if (input != null) { + session.transfer(input, REL_ORIGINAL); + } + } catch (Exception ex) { + ex.printStackTrace(); + getLogger().error("Error writing record set from Mongo query.", ex); + session.remove(output); + if (input != null) { + session.transfer(input, REL_FAILURE); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9e7bc08..bfe2b74 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,6 +15,7 @@ org.apache.nifi.processors.mongodb.DeleteMongo org.apache.nifi.processors.mongodb.GetMongo +org.apache.nifi.processors.mongodb.GetMongoRecord org.apache.nifi.processors.mongodb.RunMongoAggregation org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongoRecord \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html new file mode 100644 index 0000000..2a686cf --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.GetMongoRecord/additionalDetails.html @@ -0,0 +1,59 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>GetMongoRecord</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> +</head> + +<body> +<!-- Processor Documentation ================================================== --> +<h2>Description:</h2> +<p> + This processor runs queries against a MongoDB instance or cluster and writes the results to a flowfile. It allows + input, but can run standalone as well. It is a record-aware version of the <em>GetMongo</em> processor. +</p> +<h2>Specifying the Query</h2> +<p> + The query can be specified in one of three ways: +</p> +<ul> + <li>Query configuration property.</li> + <li>Query Attribute configuration property.</li> + <li>FlowFile content.</li> +</ul> +<p> + If a value is specified in either of the configuration properties, it will not look in the FlowFile content for a + query. +</p> +<h2>Limiting/Shaping Results</h2> +<p> + The following options for limiting/shaping results are available: +</p> +<ul> + <li>Limit - limit the number of results. This should not be confused with the "batch size" option which is a + setting for the underlying MongoDB driver to tell it how many items to retrieve in each poll of the server.</li> + <li>Sort - sort the result set. Requires a JSON document like <em>{ "someDate": -1 }</em></li> + <li>Projection - control which fields to return. Exampe, which would remove <em>_id</em>: <em>{ "_id": 0 }</em></li> +</ul> +<h2>Misc Options</h2> +<p> + Results Per FlowFile, if set, creates a JSON array out of a batch of results and writes the result to the output. + Pretty Print, if enabled, will format the JSON data to be easy read by a human (ex. proper indentation of fields). +</p> +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e603c486/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy new file mode 100644 index 0000000..fe21977 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/groovy/org/apache/nifi/processors/mongodb/GetMongoRecordIT.groovy @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb + +import groovy.json.JsonSlurper +import org.apache.nifi.flowfile.attributes.CoreAttributes +import org.apache.nifi.json.JsonRecordSetWriter +import org.apache.nifi.mongodb.MongoDBClientService +import org.apache.nifi.mongodb.MongoDBControllerService +import org.apache.nifi.schema.access.SchemaAccessUtils +import org.apache.nifi.serialization.DateTimeUtils +import org.apache.nifi.serialization.SimpleRecordSchema +import org.apache.nifi.serialization.record.* +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bson.Document +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.Test + +import static groovy.json.JsonOutput.* + +class GetMongoRecordIT { + TestRunner runner + MongoDBClientService service + + static RecordSchema SCHEMA + static final String DB_NAME = GetMongoRecord.class.simpleName + Calendar.instance.timeInMillis + static final String COL_NAME = "test" + static final String URI = "mongodb://localhost:27017" + + static { + def fields = [ + new RecordField("name", RecordFieldType.STRING.dataType), + new RecordField("failedLogins", RecordFieldType.INT.dataType), + new RecordField("lastLogin", RecordFieldType.DATE.dataType) + ] + SCHEMA = new SimpleRecordSchema(fields, new StandardSchemaIdentifier.Builder().name("sample").build()) + } + + static final List<Map> SAMPLES = [ + [ name: "John Smith", failedLogins: 2, lastLogin: Calendar.instance.time ], + [ name: "Jane Doe", failedLogins: 1, lastLogin: Calendar.instance.time - 360000 ], + [ name: "John Brown", failedLogins: 4, lastLogin: Calendar.instance.time - 10000 ] + ].collect { new Document(it) } + + @Before + void setup() { + runner = TestRunners.newTestRunner(GetMongoRecord.class) + service = new MongoDBControllerService() + runner.addControllerService("client", service) + runner.setProperty(service, MongoDBControllerService.URI, URI) + runner.enableControllerService(service) + + def writer = new JsonRecordSetWriter() + def registry = new MockSchemaRegistry() + registry.addSchema("sample", SCHEMA) + + runner.addControllerService("writer", writer) + runner.addControllerService("registry", registry) + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry") + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY) + runner.setProperty(writer, DateTimeUtils.DATE_FORMAT, "yyyy") + runner.enableControllerService(registry) + runner.enableControllerService(writer) + + runner.setProperty(GetMongoRecord.DATABASE_NAME, DB_NAME) + runner.setProperty(GetMongoRecord.COLLECTION_NAME, COL_NAME) + runner.setProperty(GetMongoRecord.CLIENT_SERVICE, "client") + runner.setProperty(GetMongoRecord.WRITER_FACTORY, "writer") + + service.getDatabase(DB_NAME).getCollection(COL_NAME).insertMany(SAMPLES) + } + + @After + void after() { + service.getDatabase(DB_NAME).drop() + } + + @Test + void testLookup() { + def ffValidator = { TestRunner runner -> + def ffs = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS) + Assert.assertNotNull(ffs) + Assert.assertTrue(ffs.size() == 1) + Assert.assertEquals("3", ffs[0].getAttribute("record.count")) + Assert.assertEquals("application/json", ffs[0].getAttribute(CoreAttributes.MIME_TYPE.key())) + Assert.assertEquals(COL_NAME, ffs[0].getAttribute(GetMongoRecord.COL_NAME)) + Assert.assertEquals(DB_NAME, ffs[0].getAttribute(GetMongoRecord.DB_NAME)) + Assert.assertEquals(Document.parse("{}"), Document.parse(ffs[0].getAttribute("executed.query"))) + } + + runner.setProperty(GetMongoRecord.QUERY_ATTRIBUTE, "executed.query") + runner.setProperty(GetMongoRecord.QUERY, "{}") + runner.enqueue("", [ "schema.name": "sample"]) + runner.run() + + runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0) + runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1) + + ffValidator(runner) + + runner.clearTransferState() + runner.removeProperty(GetMongoRecord.QUERY) + runner.enqueue("{}", [ "schema.name": "sample"]) + runner.run() + + runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0) + runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1) + + ffValidator(runner) + } + + @Test + void testSortAndProjection() { + runner.setIncomingConnection(false) + runner.setVariable("schema.name", "sample") + runner.setProperty(GetMongoRecord.SORT, toJson([failedLogins: 1])) + runner.setProperty(GetMongoRecord.PROJECTION, toJson([failedLogins: 1])) + runner.setProperty(GetMongoRecord.QUERY, "{}") + runner.run() + + def parsed = sharedTest() + Assert.assertEquals(3, parsed.size()) + def values = [1, 2, 4] + int index = 0 + parsed.each { + Assert.assertEquals(values[index++], it["failedLogins"]) + Assert.assertNull(it["name"]) + Assert.assertNull(it["lastLogin"]) + } + } + + List<Map<String, Object>> sharedTest() { + runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0) + runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1) + + def ff = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)[0] + def raw = runner.getContentAsByteArray(ff) + String content = new String(raw) + def parsed = new JsonSlurper().parseText(content) + Assert.assertNotNull(parsed) + + parsed + } + + @Test + void testLimit() { + runner.setIncomingConnection(false) + runner.setProperty(GetMongoRecord.LIMIT, "1") + runner.setProperty(GetMongoRecord.QUERY, "{}") + runner.setVariable("schema.name", "sample") + runner.run() + + def parsed = sharedTest() + Assert.assertEquals(1, parsed.size()) + + } +}
