Github user zenfenan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2448#discussion_r166320282
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
---
@@ -221,22 +258,33 @@ private void configureMapper(String setting) {
}
}
- private ObjectWriter getObjectWriter(ObjectMapper mapper, String
ppSetting) {
- return ppSetting.equals(YES_PP.getValue()) ?
mapper.writerWithDefaultPrettyPrinter()
+ private ObjectWriter getObjectWriter(ObjectMapper mapper, boolean
ppSetting) {
+ return ppSetting ? mapper.writerWithDefaultPrettyPrinter()
: mapper.writer();
}
- private void writeBatch(String payload, ProcessContext context,
ProcessSession session) {
+ private void writeBatch(String payload, ProcessContext context,
ProcessSession session, boolean doCommit, Long count, long index, int
batchSize) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(payload.getBytes("UTF-8"));
}
});
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), "application/json");
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (count != null) {
+ attrs.put(PROGRESS_START, String.valueOf(index - batchSize));
+ attrs.put(PROGRESS_END, String.valueOf(index));
+ attrs.put(PROGRESS_ESTIMATE, String.valueOf(count));
+ }
+ flowFile = session.putAllAttributes(flowFile, attrs);
session.getProvenanceReporter().receive(flowFile, getURI(context));
+
session.transfer(flowFile, REL_SUCCESS);
+ if (doCommit) {
+ session.commit();
+ }
--- End diff --
Yeah, that's what I wanted to understand. Along with a flowfile per batch,
it writes the progress (if enabled)? LGTM. +1
---