[ 
https://issues.apache.org/jira/browse/NIFI-4838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353936#comment-16353936
 ] 

ASF GitHub Bot commented on NIFI-4838:
--------------------------------------

Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2448#discussion_r166320282
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
    @@ -221,22 +258,33 @@ private void configureMapper(String setting) {
             }
         }
     
    -    private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
    -        return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
    +    private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
    +        return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
                     : mapper.writer();
         }
     
    -    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
    +    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
             FlowFile flowFile = session.create();
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
                 public void process(OutputStream out) throws IOException {
                     out.write(payload.getBytes("UTF-8"));
                 }
             });
    -        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
    +        Map<String, String> attrs = new HashMap<>();
    +        attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (count != null) {
    +            attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
    +            attrs.put(PROGRESS_END, String.valueOf(index));
    +            attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
    +        }
    +        flowFile = session.putAllAttributes(flowFile, attrs);
             session.getProvenanceReporter().receive(flowFile, getURI(context));
    +
             session.transfer(flowFile, REL_SUCCESS);
    +        if (doCommit) {
    +            session.commit();
    +        }
    --- End diff --
    
    Yeah, that's what I wanted to understand. Along with a flowfile per batch, 
it writes the progress (if enabled)? LGTM. +1


> Make GetMongo support multiple commits and give some progress indication
> ------------------------------------------------------------------------
>
>                 Key: NIFI-4838
>                 URL: https://issues.apache.org/jira/browse/NIFI-4838
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Mike Thomsen
>            Assignee: Mike Thomsen
>            Priority: Major
>
> It shouldn't wait until the end to do a commit() call because the effect is 
> that GetMongo looks like it has hung to a user who is pulling a very large 
> data set.
> It should also have an option for running a count query to get the current 
> approximate count of documents that would match the query and append an 
> attribute that indicates where a flowfile stands in the total result count. 
> Ex:
> query.progress.point.start = 2500
> query.progress.point.end = 5000
> query.count.estimate = 17,568,231



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to