Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2958#discussion_r213979035
--- Diff:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
---
@@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper
mapper, String ppSetting) {
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
FlowFile input = null;
+ logger = getLogger();
+
if (context.hasIncomingConnection()) {
input = session.get();
-
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
- final ComponentLog logger = getLogger();
+ final Document query = getQuery(context, session, input );
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ if (query == null) {
+ return;
+ }
- final Document query;
+ final String jsonTypeSetting =
context.getProperty(JSON_TYPE).getValue();
+ final String usePrettyPrint =
context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset =
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
+ final Map<String, String> attributes = new HashMap<>();
- String queryStr;
- if (context.getProperty(QUERY).isSet()) {
- queryStr =
context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
- query = Document.parse(queryStr);
- } else if (!context.getProperty(QUERY).isSet() && input == null) {
- queryStr = "{}";
- query = Document.parse("{}");
- } else {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- session.exportTo(input, out);
- out.close();
- queryStr = new String(out.toByteArray());
- query = Document.parse(queryStr);
- } catch (Exception ex) {
- getLogger().error("Error reading flowfile", ex);
- if (input != null) { //Likely culprit is a bad query
- session.transfer(input, REL_FAILURE);
- return;
- } else {
- throw new ProcessException(ex);
- }
- }
- }
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr =
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
- attributes.put(queryAttr, queryStr);
+ attributes.put(queryAttr, query.toJson());
}
final Document projection = context.getProperty(PROJECTION).isSet()
?
Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue())
: null;
final Document sort = context.getProperty(SORT).isSet()
?
Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue())
: null;
- final String jsonTypeSetting =
context.getProperty(JSON_TYPE).getValue();
- final String usePrettyPrint =
context.getProperty(USE_PRETTY_PRINTING).getValue();
- configureMapper(jsonTypeSetting);
+ final MongoCollection<Document> collection =
getCollection(context, input);
+ final FindIterable<Document> it = collection.find(query);
- try {
- final MongoCollection<Document> collection =
getCollection(context, input);
+ attributes.put(DB_NAME,
collection.getNamespace().getDatabaseName());
+ attributes.put(COL_NAME,
collection.getNamespace().getCollectionName());
- attributes.put(DB_NAME,
collection.getNamespace().getDatabaseName());
- attributes.put(COL_NAME,
collection.getNamespace().getCollectionName());
+ if (projection != null) {
+ it.projection(projection);
+ }
+ if (sort != null) {
+ it.sort(sort);
+ }
+ if (context.getProperty(LIMIT).isSet()) {
+
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
+ }
+ if (context.getProperty(BATCH_SIZE).isSet()) {
+
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ }
- final FindIterable<Document> it = query != null ?
collection.find(query) : collection.find();
- if (projection != null) {
- it.projection(projection);
- }
- if (sort != null) {
- it.sort(sort);
- }
- if (context.getProperty(LIMIT).isSet()) {
-
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
- }
- if (context.getProperty(BATCH_SIZE).isSet()) {
-
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
+ try (MongoCursor<Document> cursor = it.iterator()) {
+ final List<Document> listOfDocuments = new ArrayList<>();
+ Document doc;
+
+ configureMapper(jsonTypeSetting);
+
+ while ((doc = cursor.tryNext()) != null) {
--- End diff --
We can't load the result set into a list like this because it's easy for a
Mongo result set to overflow the heap.
---