Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2958#discussion_r213979035
  
    --- Diff: 
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 ---
    @@ -204,144 +212,145 @@ private ObjectWriter getObjectWriter(ObjectMapper 
mapper, String ppSetting) {
         @Override
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
             FlowFile input = null;
    +        logger = getLogger();
    +
             if (context.hasIncomingConnection()) {
                 input = session.get();
    -
                 if (input == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
    -        final ComponentLog logger = getLogger();
    +        final Document query = getQuery(context, session, input );
     
    -        Map<String, String> attributes = new HashMap<>();
    -        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
    +        if (query == null) {
    +            return;
    +        }
     
    -        final Document query;
    +        final String jsonTypeSetting = 
context.getProperty(JSON_TYPE).getValue();
    +        final String usePrettyPrint  = 
context.getProperty(USE_PRETTY_PRINTING).getValue();
             final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
    +        final Map<String, String> attributes = new HashMap<>();
     
    -        String queryStr;
    -        if (context.getProperty(QUERY).isSet()) {
    -            queryStr = 
context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
    -            query = Document.parse(queryStr);
    -        } else if (!context.getProperty(QUERY).isSet() && input == null) {
    -            queryStr = "{}";
    -            query = Document.parse("{}");
    -        } else {
    -            try {
    -                ByteArrayOutputStream out = new ByteArrayOutputStream();
    -                session.exportTo(input, out);
    -                out.close();
    -                queryStr = new String(out.toByteArray());
    -                query = Document.parse(queryStr);
    -            } catch (Exception ex) {
    -                getLogger().error("Error reading flowfile", ex);
    -                if (input != null) { //Likely culprit is a bad query
    -                    session.transfer(input, REL_FAILURE);
    -                    return;
    -                } else {
    -                    throw new ProcessException(ex);
    -                }
    -            }
    -        }
    +        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
     
             if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
                 final String queryAttr = 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
    -            attributes.put(queryAttr, queryStr);
    +            attributes.put(queryAttr, query.toJson());
             }
     
             final Document projection = context.getProperty(PROJECTION).isSet()
                     ? 
Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue())
 : null;
             final Document sort = context.getProperty(SORT).isSet()
                     ? 
Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue())
 : null;
    -        final String jsonTypeSetting = 
context.getProperty(JSON_TYPE).getValue();
    -        final String usePrettyPrint  = 
context.getProperty(USE_PRETTY_PRINTING).getValue();
    -        configureMapper(jsonTypeSetting);
     
    +        final MongoCollection<Document> collection = 
getCollection(context, input);
    +        final FindIterable<Document> it = collection.find(query);
     
    -        try {
    -            final MongoCollection<Document> collection = 
getCollection(context, input);
    +        attributes.put(DB_NAME, 
collection.getNamespace().getDatabaseName());
    +        attributes.put(COL_NAME, 
collection.getNamespace().getCollectionName());
     
    -            attributes.put(DB_NAME, 
collection.getNamespace().getDatabaseName());
    -            attributes.put(COL_NAME, 
collection.getNamespace().getCollectionName());
    +        if (projection != null) {
    +            it.projection(projection);
    +        }
    +        if (sort != null) {
    +            it.sort(sort);
    +        }
    +        if (context.getProperty(LIMIT).isSet()) {
    +            
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
    +        }
    +        if (context.getProperty(BATCH_SIZE).isSet()) {
    +            
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
    +        }
     
    -            final FindIterable<Document> it = query != null ? 
collection.find(query) : collection.find();
    -            if (projection != null) {
    -                it.projection(projection);
    -            }
    -            if (sort != null) {
    -                it.sort(sort);
    -            }
    -            if (context.getProperty(LIMIT).isSet()) {
    -                
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
    -            }
    -            if (context.getProperty(BATCH_SIZE).isSet()) {
    -                
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
    +        try (MongoCursor<Document> cursor = it.iterator()) {
    +            final List<Document> listOfDocuments = new ArrayList<>();
    +            Document doc;
    +
    +            configureMapper(jsonTypeSetting);
    +
    +            while ((doc = cursor.tryNext()) != null) {
    --- End diff --
    
    We can't load the result set into a list like this because it's easy for a 
Mongo result set to overflow the heap.


---

Reply via email to