This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 4de51fd NIFI-6148 - Added support for aggregation option
'allowDiskUse'
4de51fd is described below
commit 4de51fd3d5ac9240540f5738858ce0c8cf8a0bab
Author: Craig Davidson <[email protected]>
AuthorDate: Mon Mar 25 20:40:33 2019 +0000
NIFI-6148 - Added support for aggregation option 'allowDiskUse'
This closes #3389
Signed-off-by: Mike Thomsen <[email protected]>
---
.../nifi/processors/mongodb/RunMongoAggregation.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
index c9e6d56..3352ab0 100644
---
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
+++
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
@@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
import org.bson.Document;
import org.bson.conversions.Bson;
@@ -91,11 +92,23 @@ public class RunMongoAggregation extends
AbstractMongoProcessor {
.addValidator(JsonValidator.INSTANCE)
.build();
+ static final PropertyDescriptor ALLOW_DISK_USE = new
PropertyDescriptor.Builder()
+ .name("allow-disk-use")
+ .displayName("Allow Disk Use")
+ .description("Set this to true to enable writing data to temporary
files to prevent exceeding the " +
+ "maximum memory use limit during aggregation pipeline
staged when handling large datasets.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(QUERY);
+ _propertyDescriptors.add(ALLOW_DISK_USE);
_propertyDescriptors.add(JSON_TYPE);
_propertyDescriptors.add(QUERY_ATTRIBUTE);
_propertyDescriptors.add(BATCH_SIZE);
@@ -145,6 +158,7 @@ public class RunMongoAggregation extends
AbstractMongoProcessor {
}
final String query =
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
+ final Boolean allowDiskUse =
context.getProperty(ALLOW_DISK_USE).asBoolean();
final String queryAttr =
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
final Integer resultsPerFlowfile =
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
@@ -163,7 +177,7 @@ public class RunMongoAggregation extends
AbstractMongoProcessor {
try {
MongoCollection<Document> collection = getCollection(context,
flowFile);
List<Bson> aggQuery = buildAggregationQuery(query);
- AggregateIterable<Document> it = collection.aggregate(aggQuery);
+ AggregateIterable<Document> it =
collection.aggregate(aggQuery).allowDiskUse(allowDiskUse);;
it.batchSize(batchSize != null ? batchSize : 1);
iter = it.iterator();