[
https://issues.apache.org/jira/browse/NIFI-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352496#comment-16352496
]
ASF GitHub Bot commented on NIFI-4838:
--------------------------------------
Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2448#discussion_r165994104
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
}
}
- private ObjectWriter getObjectWriter(ObjectMapper mapper, String
ppSetting) {
- return ppSetting.equals(YES_PP.getValue()) ?
mapper.writerWithDefaultPrettyPrinter()
+ private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean
ppSetting) {
+ return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
: mapper.writer();
}
- private void writeBatch(String payload, ProcessContext context,
ProcessSession session) {
+ private void writeBatch(String payload, ProcessContext context,
ProcessSession session, boolean doCommit, Long count, long index, int
batchSize) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(payload.getBytes("UTF-8"));
}
});
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (count != null) {
+ attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+ attrs.put(PROGRESS_END, String.valueOf(index));
+ attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+ }
+ flowFile = session.putAllAttributes(flowFile, attrs);
session.getProvenanceReporter().receive(flowFile, getURI(context));
+
session.transfer(flowFile, REL_SUCCESS);
+ if (doCommit) {
+ session.commit();
+ }
--- End diff --
Is the motivation behind this change is to add these progress attributes to
the flowfiles and do commit for every batch? The reason I'm asking is because
the associated JIRA mentions the current GetMongo gives the perception "that it
has hung to a user who is pulling a very large data set" but the current
version also transfers a flowfile for every batch, right? If the notion of that
JIRA is to add the progress to the flowfiles and do commits for every batch
then it looks good to me.
> Make GetMongo support multiple commits and give some progress indication
> ------------------------------------------------------------------------
>
> Key: NIFI-4838
> URL: https://issues.apache.org/jira/browse/NIFI-4838
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Mike Thomsen
> Assignee: Mike Thomsen
> Priority: Major
>
> It shouldn't wait until the end to do a commit() call because the effect is
> that GetMongo looks like it has hung to a user who is pulling a very large
> data set.
> It should also have an option for running a count query to get the current
> approximate count of documents that would match the query and append an
> attribute that indicates where a flowfile stands in the total result count.
> Ex:
> query.progress.point.start = 2500
> query.progress.point.end = 5000
> query.count.estimate = 17,568,231
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)