Repository: nifi
Updated Branches:
  refs/heads/master 82ef67195 -> 517279744


NIFI-4122 Added the ability to combine multiple Mongo result documents into a 
single output JSON array.

Signed-off-by: Pierre Villard <[email protected]>

This closes #1948.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/51727974
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/51727974
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/51727974

Branch: refs/heads/master
Commit: 51727974485ffb9617c8d510377e70ea7b50cb53
Parents: 82ef671
Author: Mike Thomsen <[email protected]>
Authored: Mon Jun 26 11:06:35 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Thu Jun 29 12:43:52 2017 +0200

----------------------------------------------------------------------
 .../nifi/processors/mongodb/GetMongo.java       | 93 +++++++++++++++++---
 .../nifi/processors/mongodb/GetMongoTest.java   | 11 +++
 2 files changed, 91 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/51727974/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index fc00a08..8b73b02 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -36,6 +40,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -44,10 +49,8 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.bson.Document;
+import org.codehaus.jackson.map.ObjectMapper;
 
-import com.mongodb.client.FindIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
 
 @Tags({ "mongodb", "read", "get" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -99,6 +102,13 @@ public class GetMongo extends AbstractMongoProcessor {
         .required(false)
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .build();
+    static final PropertyDescriptor RESULTS_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
+        .name("results-per-flowfile")
+        .displayName("Results Per FlowFile")
+        .description("How many results to put into a flowfile at once. The 
whole body will be treated as a JSON array of results.")
+        .required(false)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
 
     private final static Set<Relationship> relationships;
     private final static List<PropertyDescriptor> propertyDescriptors;
@@ -111,6 +121,7 @@ public class GetMongo extends AbstractMongoProcessor {
         _propertyDescriptors.add(SORT);
         _propertyDescriptors.add(LIMIT);
         _propertyDescriptors.add(BATCH_SIZE);
+        _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
         _propertyDescriptors.add(SSL_CONTEXT_SERVICE);
         _propertyDescriptors.add(CLIENT_AUTH);
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
@@ -130,6 +141,32 @@ public class GetMongo extends AbstractMongoProcessor {
         return propertyDescriptors;
     }
 
+    private ObjectMapper mapper = new ObjectMapper();
+
+    //Turn a list of Mongo result documents into a String representation of a 
JSON array
+    private String buildBatch(List<Document> documents) throws IOException {
+        List<Map> docs = new ArrayList<>();
+        for (Document document : documents) {
+            String asJson = document.toJson();
+            docs.add(mapper.readValue(asJson, Map.class));
+        }
+
+        return mapper.writeValueAsString(docs);
+    }
+
+    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write(payload.getBytes("UTF-8"));
+            }
+        });
+        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+        session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final ComponentLog logger = getLogger();
@@ -156,19 +193,49 @@ public class GetMongo extends AbstractMongoProcessor {
             }
 
             final MongoCursor<Document> cursor = it.iterator();
+            ComponentLog log = getLogger();
             try {
                 FlowFile flowFile = null;
-                while (cursor.hasNext()) {
-                    flowFile = session.create();
-                    flowFile = session.write(flowFile, new 
OutputStreamCallback() {
-                        @Override
-                        public void process(OutputStream out) throws 
IOException {
-                            IOUtils.write(cursor.next().toJson(), out);
+                if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
+                    int ceiling = 
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
+                    List<Document> batch = new ArrayList<>();
+
+                    while (cursor.hasNext()) {
+                        batch.add(cursor.next());
+                        if (batch.size() == ceiling) {
+                            try {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Writing batch...");
+                                }
+                                String payload = buildBatch(batch);
+                                writeBatch(payload, context, session);
+                                batch = new ArrayList<>();
+                            } catch (IOException ex) {
+                                getLogger().error("Error building batch", ex);
+                            }
                         }
-                    });
-
-                    session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
-                    session.transfer(flowFile, REL_SUCCESS);
+                    }
+                    if (batch.size() > 0) {
+                        try {
+                            writeBatch(buildBatch(batch), context, session);
+                        } catch (IOException ex) {
+                            getLogger().error("Error sending remainder of 
batch", ex);
+                        }
+                    }
+                } else {
+                    while (cursor.hasNext()) {
+                        flowFile = session.create();
+                        flowFile = session.write(flowFile, new 
OutputStreamCallback() {
+                            @Override
+                            public void process(OutputStream out) throws 
IOException {
+                                IOUtils.write(cursor.next().toJson(), out);
+                            }
+                        });
+                        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
+
+                        session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
+                        session.transfer(flowFile, REL_SUCCESS);
+                    }
                 }
 
                 session.commit();

http://git-wip-us.apache.org/repos/asf/nifi/blob/51727974/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
index 810fc4d..e39148d 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoTest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
@@ -200,4 +201,14 @@ public class GetMongoTest {
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
         flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson());
     }
+
+    @Test
+    public void testResultsPerFlowfile() throws Exception {
+        runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 2);
+        List<MockFlowFile> results = 
runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
+        Assert.assertTrue("Flowfile was empty", results.get(0).getSize() > 0);
+        Assert.assertEquals("Wrong mime type", 
results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), 
"application/json");
+    }
 }

Reply via email to