Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2448#discussion_r165994104
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
    @@ -221,22 +258,33 @@ private void configureMapper(String setting) {
             }
         }
     
    -    private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
    -        return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
    +    private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean 
ppSetting) {
    +        return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
                     : mapper.writer();
         }
     
    -    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session) {
    +    private void writeBatch(String payload, ProcessContext context, 
ProcessSession session, boolean doCommit, Long count, long index, int 
batchSize) {
             FlowFile flowFile = session.create();
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
                 public void process(OutputStream out) throws IOException {
                     out.write(payload.getBytes("UTF-8"));
                 }
             });
    -        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
    +        Map<String, String> attrs = new HashMap<>();
    +        attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (count != null) {
    +            attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
    +            attrs.put(PROGRESS_END, String.valueOf(index));
    +            attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
    +        }
    +        flowFile = session.putAllAttributes(flowFile, attrs);
             session.getProvenanceReporter().receive(flowFile, getURI(context));
    +
             session.transfer(flowFile, REL_SUCCESS);
    +        if (doCommit) {
    +            session.commit();
    +        }
    --- End diff --
    
    Is the motivation behind this change is to add these progress attributes to 
the flowfiles and do commit for every batch? The reason I'm asking is because 
the associated JIRA mentions the current GetMongo gives the perception "that it 
has hung to a user who is pulling a very large data set" but the current 
version also transfers a flowfile for every batch, right? If the notion of that 
JIRA is to add the progress to the flowfiles and do commits for every batch 
then it looks good to me. 


---

Reply via email to