This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de51fd  NIFI-6148 - Added support for aggregation option 
'allowDiskUse'
4de51fd is described below

commit 4de51fd3d5ac9240540f5738858ce0c8cf8a0bab
Author: Craig Davidson <[email protected]>
AuthorDate: Mon Mar 25 20:40:33 2019 +0000

    NIFI-6148 - Added support for aggregation option 'allowDiskUse'
    
    This closes #3389
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../nifi/processors/mongodb/RunMongoAggregation.java     | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
index c9e6d56..3352ab0 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
@@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 
@@ -91,11 +92,23 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
             .addValidator(JsonValidator.INSTANCE)
             .build();
 
+    static final PropertyDescriptor ALLOW_DISK_USE = new 
PropertyDescriptor.Builder()
+            .name("allow-disk-use")
+            .displayName("Allow Disk Use")
+            .description("Set this to true to enable writing data to temporary 
files to prevent exceeding the " +
+                    "maximum memory use limit during aggregation pipeline 
staged when handling large datasets.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(descriptors);
         _propertyDescriptors.add(CHARSET);
         _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(ALLOW_DISK_USE);
         _propertyDescriptors.add(JSON_TYPE);
         _propertyDescriptors.add(QUERY_ATTRIBUTE);
         _propertyDescriptors.add(BATCH_SIZE);
@@ -145,6 +158,7 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
         }
 
         final String query = 
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
+        final Boolean allowDiskUse = 
context.getProperty(ALLOW_DISK_USE).asBoolean();
         final String queryAttr = 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
         final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
         final Integer resultsPerFlowfile = 
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
@@ -163,7 +177,7 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
         try {
             MongoCollection<Document> collection = getCollection(context, 
flowFile);
             List<Bson> aggQuery = buildAggregationQuery(query);
-            AggregateIterable<Document> it = collection.aggregate(aggQuery);
+            AggregateIterable<Document> it = 
collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);;
             it.batchSize(batchSize != null ? batchSize : 1);
 
             iter = it.iterator();

Reply via email to