[ 
https://issues.apache.org/jira/browse/NIFI-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352496#comment-16352496
 ] 

ASF GitHub Bot commented on NIFI-4838:
--------------------------------------

Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2448#discussion_r165994104
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
    @@ -221,22 +258,33 @@ private void configureMapper(String setting) {
             }
         }
     
    -    private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
    -        return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
    +    private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
    +        return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
                     : mapper.writer();
         }
     
    -    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
    +    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
             FlowFile flowFile = session.create();
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
                 public void process(OutputStream out) throws IOException {
                     out.write(payload.getBytes("UTF-8"));
                 }
             });
    -        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
    +        Map<String, String> attrs = new HashMap<>();
    +        attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (count != null) {
    +            attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
    +            attrs.put(PROGRESS_END, String.valueOf(index));
    +            attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
    +        }
    +        flowFile = session.putAllAttributes(flowFile, attrs);
             session.getProvenanceReporter().receive(flowFile, getURI(context));
    +
             session.transfer(flowFile, REL_SUCCESS);
    +        if (doCommit) {
    +            session.commit();
    +        }
    --- End diff --
    
    Is the motivation behind this change is to add these progress attributes to 
the flowfiles and do commit for every batch? The reason I'm asking is because 
the associated JIRA mentions the current GetMongo gives the perception "that it 
has hung to a user who is pulling a very large data set" but the current 
version also transfers a flowfile for every batch, right? If the notion of that 
JIRA is to add the progress to the flowfiles and do commits for every batch 
then it looks good to me. 


> Make GetMongo support multiple commits and give some progress indication
> ------------------------------------------------------------------------
>
>                 Key: NIFI-4838
>                 URL: https://issues.apache.org/jira/browse/NIFI-4838
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Mike Thomsen
>            Assignee: Mike Thomsen
>            Priority: Major
>
> It shouldn't wait until the end to do a commit() call because the effect is 
> that GetMongo looks like it has hung to a user who is pulling a very large 
> data set.
> It should also have an option for running a count query to get the current 
> approximate count of documents that would match the query and append an 
> attribute that indicates where a flowfile stands in the total result count. 
> Ex:
> query.progress.point.start = 2500
> query.progress.point.end = 5000
> query.count.estimate = 17,568,231



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to