exceptionfactory commented on code in PR #6918:
URL: https://github.com/apache/nifi/pull/6918#discussion_r1342697444


##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")

Review Comment:
   Recommend renaming to `Transactions Enabled`:
   ```suggestion
       static final PropertyDescriptor TRANSACTIONS_ENABLED = new 
PropertyDescriptor.Builder()
               .name("Transactions Enabled")
   ```



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")

Review Comment:
   It would be helpful to change the name of this property to make the purpose 
clearer. What do you think of `Execution Order`, with an enumeration of values, 
`ORDERED` and `RANDOM`?



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                final String updateType = 
updateItem.keySet().iterator().next();
+                final BsonDocument updateSpec = (BsonDocument) 
updateItem.get(updateType);
+                final WriteModel<Document> writeModel;
+                if ("insertOne".equals(updateType)) {
+                    writeModel = new 
InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document")));
+                } else if ("updateOne".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateOneModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("updateMany".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateManyModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("replaceOne".equals(updateType)) {
+                    final ReplaceOptions options = 
parseReplaceOptions(updateSpec);
+                    writeModel = new ReplaceOneModel<>((BsonDocument) 
updateSpec.get("filter"),
+                            toBsonDocument((BsonDocument) 
updateSpec.get("replacement")), options);
+                } else if ("deleteOne".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteOneModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else  if ("deleteMany".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteManyModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else {

Review Comment:
   It looks like this conditional logic could be broken out to a separate 
method that returns a `WriteModel`, or `null` on failure. That could make the 
`onTrigger` method easier to maintain.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));

Review Comment:
   Instead of exporting the FlowFile to a string, the `BsonArrayCodec`has a 
`decode` method that takes a BsonReader. There is a little bit more involved to 
set it up, but it would allow reading from a `java.io.Reader`, which could be 
an instance of `InputStreamReader`. Although the parsed objects will be loaded 
into memory, it would avoid reading the FlowFile content as a string.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                final String updateType = 
updateItem.keySet().iterator().next();
+                final BsonDocument updateSpec = (BsonDocument) 
updateItem.get(updateType);
+                final WriteModel<Document> writeModel;
+                if ("insertOne".equals(updateType)) {
+                    writeModel = new 
InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document")));
+                } else if ("updateOne".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateOneModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("updateMany".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateManyModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("replaceOne".equals(updateType)) {
+                    final ReplaceOptions options = 
parseReplaceOptions(updateSpec);
+                    writeModel = new ReplaceOneModel<>((BsonDocument) 
updateSpec.get("filter"),
+                            toBsonDocument((BsonDocument) 
updateSpec.get("replacement")), options);
+                } else if ("deleteOne".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteOneModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else  if ("deleteMany".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteManyModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else {
+                    logger.error("Invalid bulk-update: invalid update type 
{}", updateType);
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                updateModels.add(writeModel);
+            }
+
+            if (context.getProperty(USE_TRANSACTION).asBoolean()) {
+                clientSession = clientService.startSession();
+                clientSession.startTransaction();
+                // now run this w/in a transaction
+                collection.bulkWrite(clientSession, updateModels, (new 
BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
+            } else {
+                collection.bulkWrite(updateModels, (new 
BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
+            }
+            logger.info("bulk-updated {} into MongoDB", flowFile);
+            // (could also return the result again as JSON - mostly not needed 
afaik)

Review Comment:
   This comment could be removed.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                final String updateType = 
updateItem.keySet().iterator().next();
+                final BsonDocument updateSpec = (BsonDocument) 
updateItem.get(updateType);
+                final WriteModel<Document> writeModel;
+                if ("insertOne".equals(updateType)) {
+                    writeModel = new 
InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document")));
+                } else if ("updateOne".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateOneModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("updateMany".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateManyModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("replaceOne".equals(updateType)) {
+                    final ReplaceOptions options = 
parseReplaceOptions(updateSpec);
+                    writeModel = new ReplaceOneModel<>((BsonDocument) 
updateSpec.get("filter"),
+                            toBsonDocument((BsonDocument) 
updateSpec.get("replacement")), options);
+                } else if ("deleteOne".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteOneModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else  if ("deleteMany".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteManyModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else {
+                    logger.error("Invalid bulk-update: invalid update type 
{}", updateType);
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                updateModels.add(writeModel);
+            }
+
+            if (context.getProperty(USE_TRANSACTION).asBoolean()) {
+                clientSession = clientService.startSession();
+                clientSession.startTransaction();
+                // now run this w/in a transaction
+                collection.bulkWrite(clientSession, updateModels, (new 
BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
+            } else {
+                collection.bulkWrite(updateModels, (new 
BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
+            }
+            logger.info("bulk-updated {} into MongoDB", flowFile);
+            // (could also return the result again as JSON - mostly not needed 
afaik)
+
+            session.getProvenanceReporter().send(flowFile, getURI(context));
+            session.transfer(flowFile, REL_SUCCESS);
+
+            if (clientSession != null) {
+                if (clientSession.hasActiveTransaction()) {
+                    clientSession.commitTransaction();
+                }
+                clientSession.close();
+            }
+        } catch (Exception e) {
+            logger.error("Failed to bulk-update {} into MongoDB", flowFile, e);
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();

Review Comment:
   Is there a reason for this yield?



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                final String updateType = 
updateItem.keySet().iterator().next();
+                final BsonDocument updateSpec = (BsonDocument) 
updateItem.get(updateType);
+                final WriteModel<Document> writeModel;
+                if ("insertOne".equals(updateType)) {
+                    writeModel = new 
InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document")));
+                } else if ("updateOne".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateOneModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("updateMany".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateManyModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("replaceOne".equals(updateType)) {
+                    final ReplaceOptions options = 
parseReplaceOptions(updateSpec);
+                    writeModel = new ReplaceOneModel<>((BsonDocument) 
updateSpec.get("filter"),
+                            toBsonDocument((BsonDocument) 
updateSpec.get("replacement")), options);
+                } else if ("deleteOne".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteOneModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else  if ("deleteMany".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteManyModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else {
+                    logger.error("Invalid bulk-update: invalid update type 
{}", updateType);
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                updateModels.add(writeModel);
+            }
+
+            if (context.getProperty(USE_TRANSACTION).asBoolean()) {
+                clientSession = clientService.startSession();
+                clientSession.startTransaction();
+                // now run this w/in a transaction
+                collection.bulkWrite(clientSession, updateModels, (new 
BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
+            } else {
+                collection.bulkWrite(updateModels, (new 
BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean())));
+            }
+            logger.info("bulk-updated {} into MongoDB", flowFile);
+            // (could also return the result again as JSON - mostly not needed 
afaik)
+
+            session.getProvenanceReporter().send(flowFile, getURI(context));
+            session.transfer(flowFile, REL_SUCCESS);
+
+            if (clientSession != null) {
+                if (clientSession.hasActiveTransaction()) {
+                    clientSession.commitTransaction();
+                }
+                clientSession.close();
+            }
+        } catch (Exception e) {
+            logger.error("Failed to bulk-update {} into MongoDB", flowFile, e);
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+            try {
+                if (clientSession != null) {
+                    if (clientSession.hasActiveTransaction()) {
+                        clientSession.abortTransaction();
+                    }
+                    clientSession.close();
+                }
+            } catch (Exception ee) {
+                logger.error("Cannot rollback client session", ee); // (but no 
further action)

Review Comment:
   Recommend changing this to a warning since no further action is possible.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven

Review Comment:
   This annotation is deprecated and now removed from the main branch, so it 
should be removed.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();
+                    return;
+                }
+                final String updateType = 
updateItem.keySet().iterator().next();
+                final BsonDocument updateSpec = (BsonDocument) 
updateItem.get(updateType);
+                final WriteModel<Document> writeModel;
+                if ("insertOne".equals(updateType)) {
+                    writeModel = new 
InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document")));
+                } else if ("updateOne".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateOneModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("updateMany".equals(updateType)) {
+                    final UpdateOptions options = 
parseUpdateOptions(updateSpec);
+                    writeModel = new UpdateManyModel<>((BsonDocument) 
updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options);
+                } else if ("replaceOne".equals(updateType)) {
+                    final ReplaceOptions options = 
parseReplaceOptions(updateSpec);
+                    writeModel = new ReplaceOneModel<>((BsonDocument) 
updateSpec.get("filter"),
+                            toBsonDocument((BsonDocument) 
updateSpec.get("replacement")), options);
+                } else if ("deleteOne".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteOneModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else  if ("deleteMany".equals(updateType)) {
+                    final DeleteOptions options = 
parseDeleteOptions(updateSpec);
+                    writeModel = new DeleteManyModel<>((BsonDocument) 
updateSpec.get("filter"), options);
+                } else {
+                    logger.error("Invalid bulk-update: invalid update type 
{}", updateType);

Review Comment:
   It would be helpful to include the FlowFile reference for this error.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();

Review Comment:
   Declaring this method-local variable is not necessary, recommend calling 
`getLogger()` where needed.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));
+                    session.transfer(flowFile, REL_FAILURE);
+                    context.yield();

Review Comment:
   Is there a reason for this yield? The FlowFile is not being reprocessed.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mongodb;
+
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Collation;
+import com.mongodb.client.model.CollationAlternate;
+import com.mongodb.client.model.CollationCaseFirst;
+import com.mongodb.client.model.CollationMaxVariable;
+import com.mongodb.client.model.CollationStrength;
+import com.mongodb.client.model.DeleteManyModel;
+import com.mongodb.client.model.DeleteOneModel;
+import com.mongodb.client.model.DeleteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.ReplaceOneModel;
+import com.mongodb.client.model.ReplaceOptions;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as 
bulk-update")
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+public class PutMongoBulkOperations extends AbstractMongoProcessor {
+    static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
+    static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
+
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("Ordered")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Ordered execution of bulk-writes & break on error - 
otherwise arbitrary order & continue on error")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    static final PropertyDescriptor USE_TRANSACTION = new 
PropertyDescriptor.Builder()
+            .name("Use transaction")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .description("Run all actions in one MongoDB transaction")
+            .required(false)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character Set in which the data is encoded")
+        .required(true)
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .build();
+
+    private final static Set<Relationship> relationships;
+    private final static List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(descriptors);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(USE_TRANSACTION);
+        _propertyDescriptors.add(CHARACTER_SET);
+        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
+        final Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ComponentLog logger = getLogger();
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+        final WriteConcern writeConcern = clientService.getWriteConcern();
+
+        ClientSession clientSession = null;
+        try {
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+            // Read the contents of the FlowFile into a byte array
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            session.exportTo(flowFile, os);
+
+            // parse
+            final BsonArray updateItems = 
BsonArray.parse(os.toString(charset));
+
+            List<WriteModel<Document>> updateModels = new ArrayList<>();
+            for (Object item : updateItems) {
+                final BsonDocument updateItem = (BsonDocument) item;
+                if (updateItem.keySet().size() != 1) {
+                    logger.error("Invalid bulk-update: more than one type 
given {}", String.join(", ", updateItem.keySet()));

Review Comment:
   It would be helpful to include the FlowFile reference for this error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to