[
https://issues.apache.org/jira/browse/NIFI-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353936#comment-16353936
]
ASF GitHub Bot commented on NIFI-4838:
--------------------------------------
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2448#discussion_r166320282
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
}
}
- private ObjectWriter getObjectWriter(ObjectMapper mapper, String
ppSetting) {
- return ppSetting.equals(YES_PP.getValue()) ?
mapper.writerWithDefaultPrettyPrinter()
+ private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean
ppSetting) {
+ return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
: mapper.writer();
}
- private void writeBatch(String payload, ProcessContext context,
ProcessSession session) {
+ private void writeBatch(String payload, ProcessContext context,
ProcessSession session, boolean doCommit, Long count, long index, int
batchSize) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(payload.getBytes("UTF-8"));
}
});
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (count != null) {
+ attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+ attrs.put(PROGRESS_END, String.valueOf(index));
+ attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+ }
+ flowFile = session.putAllAttributes(flowFile, attrs);
session.getProvenanceReporter().receive(flowFile, getURI(context));
+
session.transfer(flowFile, REL_SUCCESS);
+ if (doCommit) {
+ session.commit();
+ }
--- End diff --
Yeah, that's what I wanted to understand. Along with a flowfile per batch,
it writes the progress (if enabled)? LGTM. +1
> Make GetMongo support multiple commits and give some progress indication
> ------------------------------------------------------------------------
>
> Key: NIFI-4838
> URL: https://issues.apache.org/jira/browse/NIFI-4838
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Mike Thomsen
> Assignee: Mike Thomsen
> Priority: Major
>
> It shouldn't wait until the end to do a commit() call because the effect is
> that GetMongo looks like it has hung to a user who is pulling a very large
> data set.
> It should also have an option for running a count query to get the current
> approximate count of documents that would match the query and append an
> attribute that indicates where a flowfile stands in the total result count.
> Ex:
> query.progress.point.start = 2500
> query.progress.point.end = 5000
> query.count.estimate = 17,568,231
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)