Repository: nifi
Updated Branches:
  refs/heads/master 329b1caf8 -> 8b54c2604


NIFI-4082 - Added EL on GetMongo properties

NIFI-4082 - Added EL on DB, URI and Collection

NIFI-4082 - Added UT for EL evaluation (URI, DB, Collection) and fixed ex. 
message for document validator.

This closes #1969


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8b54c260
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8b54c260
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8b54c260

Branch: refs/heads/master
Commit: 8b54c2604cde4f3516d89a0f48650f7c30e34c3b
Parents: 329b1ca
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Fri Jun 30 20:43:50 2017 +0200
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Mon Aug 7 13:40:27 2017 -0400

----------------------------------------------------------------------
 .../mongodb/AbstractMongoProcessor.java         | 27 +++++++++++++++-----
 .../nifi/processors/mongodb/GetMongo.java       | 27 +++++++++++++++-----
 .../nifi/processors/mongodb/PutMongo.java       |  4 +--
 .../nifi/processors/mongodb/GetMongoTest.java   | 18 ++++++++-----
 .../nifi/processors/mongodb/PutMongoTest.java   |  9 ++++---
 5 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 486a077..6f165c2 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.authentication.exception.ProviderCreationException;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -48,18 +49,21 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         .name("Mongo URI")
         .description("MongoURI, typically of the form: 
mongodb://host1[:port1][,host2[:port2],...]")
         .required(true)
+        .expressionLanguageSupported(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
     protected static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
         .name("Mongo Database Name")
         .description("The name of the database to use")
         .required(true)
+        .expressionLanguageSupported(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
     protected static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
         .name("Mongo Collection Name")
         .description("The name of the collection to use")
         .required(true)
+        .expressionLanguageSupported(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
@@ -124,11 +128,10 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         }
 
         try {
-            final String uri = context.getProperty(URI).getValue();
             if(sslContext == null) {
-                mongoClient = new MongoClient(new MongoClientURI(uri));
+                mongoClient = new MongoClient(new 
MongoClientURI(getURI(context)));
             } else {
-                mongoClient = new MongoClient(new MongoClientURI(uri, 
getClientOptions(sslContext)));
+                mongoClient = new MongoClient(new 
MongoClientURI(getURI(context), getClientOptions(sslContext)));
             }
         } catch (Exception e) {
             getLogger().error("Failed to schedule {} due to {}", new Object[] 
{ this.getClass().getName(), e }, e);
@@ -153,12 +156,24 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
     }
 
     protected MongoDatabase getDatabase(final ProcessContext context) {
-        final String databaseName = 
context.getProperty(DATABASE_NAME).getValue();
+        return getDatabase(context, null);
+    }
+
+    protected MongoDatabase getDatabase(final ProcessContext context, final 
FlowFile flowFile) {
+        final String databaseName = 
context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
         return mongoClient.getDatabase(databaseName);
     }
 
     protected MongoCollection<Document> getCollection(final ProcessContext 
context) {
-        final String collectionName = 
context.getProperty(COLLECTION_NAME).getValue();
-        return getDatabase(context).getCollection(collectionName);
+        return getCollection(context, null);
+    }
+
+    protected MongoCollection<Document> getCollection(final ProcessContext 
context, final FlowFile flowFile) {
+        final String collectionName = 
context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        return getDatabase(context, flowFile).getCollection(collectionName);
+    }
+
+    protected String getURI(final ProcessContext context) {
+        return 
context.getProperty(URI).evaluateAttributeExpressions().getValue();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 8b73b02..79f50b2 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -59,14 +59,21 @@ public class GetMongo extends AbstractMongoProcessor {
     public static final Validator DOCUMENT_VALIDATOR = new Validator() {
         @Override
         public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
+            builder.subject(subject).input(value);
+
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(value)) {
+                return builder.valid(true).explanation("Contains Expression 
Language").build();
+            }
+
             String reason = null;
             try {
                 Document.parse(value);
             } catch (final RuntimeException e) {
-                reason = e.getClass().getName();
+                reason = e.getLocalizedMessage();
             }
 
-            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+            return builder.explanation(reason).valid(reason == null).build();
         }
     };
 
@@ -76,18 +83,21 @@ public class GetMongo extends AbstractMongoProcessor {
         .name("Query")
         .description("The selection criteria; must be a valid MongoDB Extended 
JSON format; if omitted the entire collection will be queried")
         .required(false)
+        .expressionLanguageSupported(true)
         .addValidator(DOCUMENT_VALIDATOR)
         .build();
     static final PropertyDescriptor PROJECTION = new 
PropertyDescriptor.Builder()
         .name("Projection")
         .description("The fields to be returned from the documents in the 
result set; must be a valid BSON document")
         .required(false)
+        .expressionLanguageSupported(true)
         .addValidator(DOCUMENT_VALIDATOR)
         .build();
     static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
         .name("Sort")
         .description("The fields by which to sort; must be a valid BSON 
document")
         .required(false)
+        .expressionLanguageSupported(true)
         .addValidator(DOCUMENT_VALIDATOR)
         .build();
     static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
@@ -163,7 +173,7 @@ public class GetMongo extends AbstractMongoProcessor {
             }
         });
         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
-        session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
+        session.getProvenanceReporter().receive(flowFile, getURI(context));
         session.transfer(flowFile, REL_SUCCESS);
     }
 
@@ -171,9 +181,12 @@ public class GetMongo extends AbstractMongoProcessor {
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final ComponentLog logger = getLogger();
 
-        final Document query = context.getProperty(QUERY).isSet() ? 
Document.parse(context.getProperty(QUERY).getValue()) : null;
-        final Document projection = context.getProperty(PROJECTION).isSet() ? 
Document.parse(context.getProperty(PROJECTION).getValue()) : null;
-        final Document sort = context.getProperty(SORT).isSet() ? 
Document.parse(context.getProperty(SORT).getValue()) : null;
+        final Document query = context.getProperty(QUERY).isSet()
+                ? 
Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue())
 : null;
+        final Document projection = context.getProperty(PROJECTION).isSet()
+                ? 
Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue())
 : null;
+        final Document sort = context.getProperty(SORT).isSet()
+                ? 
Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue())
 : null;
 
         final MongoCollection<Document> collection = getCollection(context);
 
@@ -233,7 +246,7 @@ public class GetMongo extends AbstractMongoProcessor {
                         });
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
 
-                        session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
+                        session.getProvenanceReporter().receive(flowFile, 
getURI(context));
                         session.transfer(flowFile, REL_SUCCESS);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index 51e5265..5b5ad52 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -148,7 +148,7 @@ public class PutMongo extends AbstractMongoProcessor {
         final String mode = context.getProperty(MODE).getValue();
         final WriteConcern writeConcern = getWriteConcern(context);
 
-        final MongoCollection<Document> collection = 
getCollection(context).withWriteConcern(writeConcern);
+        final MongoCollection<Document> collection = getCollection(context, 
flowFile).withWriteConcern(writeConcern);
 
         try {
             // Read the contents of the FlowFile into a byte array
@@ -176,7 +176,7 @@ public class PutMongo extends AbstractMongoProcessor {
                 logger.info("updated {} into MongoDB", new Object[] { flowFile 
});
             }
 
-            session.getProvenanceReporter().send(flowFile, 
context.getProperty(URI).getValue());
+            session.getProvenanceReporter().send(flowFile, getURI(context));
             session.transfer(flowFile, REL_SUCCESS);
         } catch (Exception e) {
             logger.error("Failed to insert {} into MongoDB due to {}", new 
Object[] {flowFile, e}, e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
index e39148d..455d705 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
@@ -60,9 +60,12 @@ public class GetMongoTest {
     @Before
     public void setup() {
         runner = TestRunners.newTestRunner(GetMongo.class);
-        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
-        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
-        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
COLLECTION_NAME);
+        runner.setVariable("uri", MONGO_URI);
+        runner.setVariable("db", DB_NAME);
+        runner.setVariable("collection", COLLECTION_NAME);
+        runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
+        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
+        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
"${collection}");
 
         mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
 
@@ -120,8 +123,9 @@ public class GetMongoTest {
         
Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is 
invalid because org.bson.json.JsonParseException"));
 
         // invalid projection
+        runner.setVariable("projection", "{a: x,y,z}");
         runner.setProperty(GetMongo.QUERY, "{a: 1}");
-        runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}");
+        runner.setProperty(GetMongo.PROJECTION, "${projection}");
         runner.enqueue(new byte[0]);
         pc = runner.getProcessContext();
         results = new HashSet<>();
@@ -146,7 +150,8 @@ public class GetMongoTest {
 
     @Test
     public void testReadOneDocument() throws Exception {
-        runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
+        runner.setVariable("query", "{a: 1, b: 3}");
+        runner.setProperty(GetMongo.QUERY, "${query}");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
@@ -180,8 +185,9 @@ public class GetMongoTest {
 
     @Test
     public void testSort() throws Exception {
+        runner.setVariable("sort", "{a: -1, b: -1, c: 1}");
         runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
-        runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}");
+        runner.setProperty(GetMongo.SORT, "${sort}");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8b54c260/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
index 6f22976..10f81d1 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -61,9 +61,12 @@ public class PutMongoTest {
     @Before
     public void setup() {
         runner = TestRunners.newTestRunner(PutMongo.class);
-        runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
-        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, 
DATABASE_NAME);
-        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
COLLECTION_NAME);
+        runner.setVariable("uri", MONGO_URI);
+        runner.setVariable("db", DATABASE_NAME);
+        runner.setVariable("collection", COLLECTION_NAME);
+        runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
+        runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
+        runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, 
"${collection}");
 
         mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
 

Reply via email to