[ 
https://issues.apache.org/jira/browse/NIFI-4122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066680#comment-16066680
 ] 

ASF GitHub Bot commented on NIFI-4122:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1948#discussion_r124570993
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
    @@ -158,17 +191,43 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
                 final MongoCursor<Document> cursor = it.iterator();
                 try {
                     FlowFile flowFile = null;
    -                while (cursor.hasNext()) {
    -                    flowFile = session.create();
    -                    flowFile = session.write(flowFile, new 
OutputStreamCallback() {
    -                        @Override
    -                        public void process(OutputStream out) throws 
IOException {
    -                            IOUtils.write(cursor.next().toJson(), out);
    +                if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
    +                    int ceiling = 
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
    +                    List<Document> batch = new ArrayList<>();
    +
    +                    while (cursor.hasNext()) {
    +                        batch.add(cursor.next());
    +                        if (batch.size() == ceiling) {
    +                            try {
    +                                getLogger().info("Writing batch...");
    +                                String payload = buildBatch(batch);
    +                                writeBatch(payload, context, session);
    +                                batch = new ArrayList<>();
    +                            } catch (IOException ex) {
    +                                getLogger().error("Error building batch", 
ex);
    +                            }
                             }
    -                    });
    -
    -                    session.getProvenanceReporter().receive(flowFile, 
context.getProperty(URI).getValue());
    -                    session.transfer(flowFile, REL_SUCCESS);
    +                    }
    +                    if (batch.size() > 0) {
    +                        try {
    +                            writeBatch(buildBatch(batch), context, 
session);
    +                        } catch (IOException ex) {
    +                            getLogger().error("Error sending remainder of 
batch", ex);
    +                        }
    +                    }
    +                } else {
    +                    while (cursor.hasNext()) {
    +                        flowFile = session.create();
    +                        flowFile = session.write(flowFile, new 
OutputStreamCallback() {
    +                            @Override
    +                            public void process(OutputStream out) throws 
IOException {
    +                                IOUtils.write(cursor.next().toJson(), out);
    +                            }
    +                        });
    +
    --- End diff --
    
    Could you also add the following?
    ````java
    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), 
"application/json");
    ````
    It's not mandatory but will improve UX when viewing flow file content in 
the UI.


> GetMongo should be able to group results into a set of flowfiles
> ----------------------------------------------------------------
>
>                 Key: NIFI-4122
>                 URL: https://issues.apache.org/jira/browse/NIFI-4122
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Mike Thomsen
>            Priority: Minor
>              Labels: getmongo, mongodb, nifi
>
> GetMongo should be able to take a user-defined limit and group results by 
> that size into flowfiles rather than having only the ability to do a 1:1 
> relationship between result and flowfile.
> For example, if the user specifies 100, 100 results should be grouped 
> together and turned into a JSON array that can be broken up later as needed.
> This need arose when doing a bulk data ingestion from Mongo. We had shy of 
> 400k documents, and the 1:1 generation of flowfiles blew right through our 
> limits on the content repository. Adding this feature would make it feasible 
> to control that sort of behavior more thoroughly for events like bulk 
> ingestion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to