Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2448#discussion_r165994104
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
}
}
- private ObjectWriter getObjectWriter(ObjectMapper mapper, String
ppSetting) {
- return ppSetting.equals(YES_PP.getValue()) ?
mapper.writerWithDefaultPrettyPrinter()
+ private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean
ppSetting) {
+ return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
: mapper.writer();
}
- private void writeBatch(String payload, ProcessContext context,
ProcessSession session) {
+ private void writeBatch(String payload, ProcessContext context,
ProcessSession session, boolean doCommit, Long count, long index, int
batchSize) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(payload.getBytes("UTF-8"));
}
});
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (count != null) {
+ attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+ attrs.put(PROGRESS_END, String.valueOf(index));
+ attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+ }
+ flowFile = session.putAllAttributes(flowFile, attrs);
session.getProvenanceReporter().receive(flowFile, getURI(context));
+
session.transfer(flowFile, REL_SUCCESS);
+ if (doCommit) {
+ session.commit();
+ }
--- End diff --
Is the motivation behind this change is to add these progress attributes to
the flowfiles and do commit for every batch? The reason I'm asking is because
the associated JIRA mentions the current GetMongo gives the perception "that it
has hung to a user who is pulling a very large data set" but the current
version also transfers a flowfile for every batch, right? If the notion of that
JIRA is to add the progress to the flowfiles and do commits for every batch
then it looks good to me.
---