[
https://issues.apache.org/jira/browse/NIFI-4122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066680#comment-16066680
]
ASF GitHub Bot commented on NIFI-4122:
--------------------------------------
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1948#discussion_r124570993
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
---
@@ -158,17 +191,43 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final MongoCursor<Document> cursor = it.iterator();
try {
FlowFile flowFile = null;
- while (cursor.hasNext()) {
- flowFile = session.create();
- flowFile = session.write(flowFile, new
OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws
IOException {
- IOUtils.write(cursor.next().toJson(), out);
+ if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
+ int ceiling =
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
+ List<Document> batch = new ArrayList<>();
+
+ while (cursor.hasNext()) {
+ batch.add(cursor.next());
+ if (batch.size() == ceiling) {
+ try {
+ getLogger().info("Writing batch...");
+ String payload = buildBatch(batch);
+ writeBatch(payload, context, session);
+ batch = new ArrayList<>();
+ } catch (IOException ex) {
+ getLogger().error("Error building batch",
ex);
+ }
}
- });
-
- session.getProvenanceReporter().receive(flowFile,
context.getProperty(URI).getValue());
- session.transfer(flowFile, REL_SUCCESS);
+ }
+ if (batch.size() > 0) {
+ try {
+ writeBatch(buildBatch(batch), context,
session);
+ } catch (IOException ex) {
+ getLogger().error("Error sending remainder of
batch", ex);
+ }
+ }
+ } else {
+ while (cursor.hasNext()) {
+ flowFile = session.create();
+ flowFile = session.write(flowFile, new
OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws
IOException {
+ IOUtils.write(cursor.next().toJson(), out);
+ }
+ });
+
--- End diff --
Could you also add the following?
````java
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(),
"application/json");
````
It's not mandatory but will improve UX when viewing flow file content in
the UI.
> GetMongo should be able to group results into a set of flowfiles
> ----------------------------------------------------------------
>
> Key: NIFI-4122
> URL: https://issues.apache.org/jira/browse/NIFI-4122
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Mike Thomsen
> Priority: Minor
> Labels: getmongo, mongodb, nifi
>
> GetMongo should be able to take a user-defined limit and group results by
> that size into flowfiles rather than having only the ability to do a 1:1
> relationship between result and flowfile.
> For example, if the user specifies 100, 100 results should be grouped
> together and turned into a JSON array that can be broken up later as needed.
> This need arose when doing a bulk data ingestion from Mongo. We had shy of
> 400k documents, and the 1:1 generation of flowfiles blew right through our
> limits on the content repository. Adding this feature would make it feasible
> to control that sort of behavior more thoroughly for events like bulk
> ingestion.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)