Repository: nifi Updated Branches: refs/heads/master 329b1caf8 -> 8b54c2604
NIFI-4082 - Added EL on GetMongo properties NIFI-4082 - Added EL on DB, URI and Collection NIFI-4082 - Added UT for EL evaluation (URI, DB, Collection) and fixed ex. message for document validator. This closes #1969 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8b54c260 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8b54c260 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8b54c260 Branch: refs/heads/master Commit: 8b54c2604cde4f3516d89a0f48650f7c30e34c3b Parents: 329b1ca Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Fri Jun 30 20:43:50 2017 +0200 Committer: Matt Burgess <mattyb...@apache.org> Committed: Mon Aug 7 13:40:27 2017 -0400 ---------------------------------------------------------------------- .../mongodb/AbstractMongoProcessor.java | 27 +++++++++++++++----- .../nifi/processors/mongodb/GetMongo.java | 27 +++++++++++++++----- .../nifi/processors/mongodb/PutMongo.java | 4 +-- .../nifi/processors/mongodb/GetMongoTest.java | 18 ++++++++----- .../nifi/processors/mongodb/PutMongoTest.java | 9 ++++--- 5 files changed, 61 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 486a077..6f165c2 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; @@ -48,18 +49,21 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .name("Mongo URI") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") .description("The name of the database to use") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("Mongo Collection Name") .description("The name of the collection to use") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() @@ -124,11 +128,10 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { } try { - final String uri = context.getProperty(URI).getValue(); if(sslContext == null) { - mongoClient = new MongoClient(new MongoClientURI(uri)); + mongoClient = new MongoClient(new MongoClientURI(getURI(context))); } else { - mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext))); + mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext))); } } catch (Exception e) { getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e); @@ -153,12 +156,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { } protected MongoDatabase getDatabase(final ProcessContext context) { - final String databaseName = context.getProperty(DATABASE_NAME).getValue(); + return getDatabase(context, null); + } + + protected MongoDatabase getDatabase(final ProcessContext context, final FlowFile flowFile) { + final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue(); return mongoClient.getDatabase(databaseName); } protected MongoCollection<Document> getCollection(final ProcessContext context) { - final String collectionName = context.getProperty(COLLECTION_NAME).getValue(); - return getDatabase(context).getCollection(collectionName); + return getCollection(context, null); + } + + protected MongoCollection<Document> getCollection(final ProcessContext context, final FlowFile flowFile) { + final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue(); + return getDatabase(context, flowFile).getCollection(collectionName); + } + + protected String getURI(final ProcessContext context) { + return context.getProperty(URI).evaluateAttributeExpressions().getValue(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 8b73b02..79f50b2 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -59,14 +59,21 @@ public class GetMongo extends AbstractMongoProcessor { public static final Validator DOCUMENT_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(value); + + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return builder.valid(true).explanation("Contains Expression Language").build(); + } + String reason = null; try { Document.parse(value); } catch (final RuntimeException e) { - reason = e.getClass().getName(); + reason = e.getLocalizedMessage(); } - return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + return builder.explanation(reason).valid(reason == null).build(); } }; @@ -76,18 +83,21 @@ public class GetMongo extends AbstractMongoProcessor { .name("Query") .description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried") .required(false) + .expressionLanguageSupported(true) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder() .name("Projection") .description("The fields to be returned from the documents in the result set; must be a valid BSON document") .required(false) + .expressionLanguageSupported(true) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() .name("Sort") .description("The fields by which to sort; must be a valid BSON document") .required(false) + .expressionLanguageSupported(true) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() @@ -163,7 +173,7 @@ public class GetMongo extends AbstractMongoProcessor { } }); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); } @@ -171,9 +181,12 @@ public class GetMongo extends AbstractMongoProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ComponentLog logger = getLogger(); - final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null; - final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null; - final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null; + final Document query = context.getProperty(QUERY).isSet() + ? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null; + final Document projection = context.getProperty(PROJECTION).isSet() + ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null; + final Document sort = context.getProperty(SORT).isSet() + ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null; final MongoCollection<Document> collection = getCollection(context); @@ -233,7 +246,7 @@ public class GetMongo extends AbstractMongoProcessor { }); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue()); + session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 51e5265..5b5ad52 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -148,7 +148,7 @@ public class PutMongo extends AbstractMongoProcessor { final String mode = context.getProperty(MODE).getValue(); final WriteConcern writeConcern = getWriteConcern(context); - final MongoCollection<Document> collection = getCollection(context).withWriteConcern(writeConcern); + final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern); try { // Read the contents of the FlowFile into a byte array @@ -176,7 +176,7 @@ public class PutMongo extends AbstractMongoProcessor { logger.info("updated {} into MongoDB", new Object[] { flowFile }); } - session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue()); + session.getProvenanceReporter().send(flowFile, getURI(context)); session.transfer(flowFile, REL_SUCCESS); } catch (Exception e) { logger.error("Failed to insert {} into MongoDB due to {}", new Object[] {flowFile, e}, e); http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java index e39148d..455d705 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java @@ -60,9 +60,12 @@ public class GetMongoTest { @Before public void setup() { runner = TestRunners.newTestRunner(GetMongo.class); - runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); - runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); - runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.setVariable("uri", MONGO_URI); + runner.setVariable("db", DB_NAME); + runner.setVariable("collection", COLLECTION_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); @@ -120,8 +123,9 @@ public class GetMongoTest { Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException")); // invalid projection + runner.setVariable("projection", "{a: x,y,z}"); runner.setProperty(GetMongo.QUERY, "{a: 1}"); - runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}"); + runner.setProperty(GetMongo.PROJECTION, "${projection}"); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); results = new HashSet<>(); @@ -146,7 +150,8 @@ public class GetMongoTest { @Test public void testReadOneDocument() throws Exception { - runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}"); + runner.setVariable("query", "{a: 1, b: 3}"); + runner.setProperty(GetMongo.QUERY, "${query}"); runner.run(); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); @@ -180,8 +185,9 @@ public class GetMongoTest { @Test public void testSort() throws Exception { + runner.setVariable("sort", "{a: -1, b: -1, c: 1}"); runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}"); - runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}"); + runner.setProperty(GetMongo.SORT, "${sort}"); runner.run(); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java index 6f22976..10f81d1 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java @@ -61,9 +61,12 @@ public class PutMongoTest { @Before public void setup() { runner = TestRunners.newTestRunner(PutMongo.class); - runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); - runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); - runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.setVariable("uri", MONGO_URI); + runner.setVariable("db", DATABASE_NAME); + runner.setVariable("collection", COLLECTION_NAME); + runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));