This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c5ed5c51e9 NIFI-12224 Added Support for updateMany Method in PutMongo
c5ed5c51e9 is described below

commit c5ed5c51e93f5b1f9311b5a478e64b2f1320a9d4
Author: Umar Hussain <[email protected]>
AuthorDate: Sun Apr 7 02:47:03 2024 +0500

    NIFI-12224 Added Support for updateMany Method in PutMongo
    
    This closes #8610
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/mongodb/AbstractMongoProcessor.java |  62 +++-
 .../apache/nifi/processors/mongodb/PutMongo.java   |  93 ++++--
 .../nifi/processors/mongodb/PutMongoRecord.java    |  42 +--
 .../apache/nifi/processors/mongodb/PutMongoIT.java | 337 ++++++++++++++++-----
 .../nifi/processors/mongodb/PutMongoRecordIT.java  |   9 +-
 5 files changed, 387 insertions(+), 156 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 9a5c8eb724..5d7b4d6843 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.DescribedValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -46,13 +47,13 @@ import java.io.ByteArrayInputStream;
 import java.io.UnsupportedEncodingException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
 public abstract class AbstractMongoProcessor extends AbstractProcessor {
+    public static final String ATTRIBUTE_MONGODB_UPDATE_MODE = 
"mongodb.update.mode";
+
     protected static final String JSON_TYPE_EXTENDED = "Extended";
     protected static final String JSON_TYPE_STANDARD   = "Standard";
     protected static final AllowableValue JSON_EXTENDED = new 
AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
@@ -156,14 +157,41 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final List<PropertyDescriptor> descriptors;
+    static final List<PropertyDescriptor> descriptors = List.of(
+            CLIENT_SERVICE,
+            DATABASE_NAME,
+            COLLECTION_NAME
+    );
+
+    public enum UpdateMethod implements DescribedValue {
+        UPDATE_ONE("one", "Update One", "Updates only the first document that 
matches the query."),
+        UPDATE_MANY("many", "Update Many", "Updates every document that 
matches the query."),
+        UPDATE_FF_ATTRIBUTE("flowfile-attribute", "Use '" + 
ATTRIBUTE_MONGODB_UPDATE_MODE + "' FlowFile attribute.",
+            "Use the value of the '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' 
attribute of the incoming FlowFile. Acceptable values are 'one' and 'many'.");
+        private final String value;
+        private final String displayName;
+        private final String description;
+
+        UpdateMethod(final String value, final String displayName, final 
String description) {
+            this.value = value;
+            this.displayName = displayName;
+            this.description = description;
+        }
 
-    static {
-        List<PropertyDescriptor> _temp = new ArrayList<>();
-        _temp.add(CLIENT_SERVICE);
-        _temp.add(DATABASE_NAME);
-        _temp.add(COLLECTION_NAME);
-        descriptors = Collections.unmodifiableList(_temp);
+        @Override
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public String getDisplayName() {
+            return displayName;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
     }
 
     protected ObjectMapper objectMapper;
@@ -235,4 +263,20 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
             objectMapper.setDateFormat(df);
         }
     }
+
+    /**
+     * Checks if given update mode option matches for the incoming flow file
+     * @param updateMethodToMatch the value against which processor's mode is 
compared
+     * @param configuredUpdateMethod the value coming from running processor
+     * @param flowFile incoming flow file to extract processor mode
+     * @return true if the incoming files update mode matches with 
updateMethodToMatch
+     */
+    protected boolean updateModeMatches(
+        UpdateMethod updateMethodToMatch, UpdateMethod configuredUpdateMethod, 
FlowFile flowFile) {
+
+        return updateMethodToMatch == configuredUpdateMethod
+            || (UpdateMethod.UPDATE_FF_ATTRIBUTE == configuredUpdateMethod
+                    && 
updateMethodToMatch.getValue().equalsIgnoreCase(flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE)));
+    }
+
 }
diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index 526c041742..e6e8d23c14 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -21,10 +21,13 @@ import com.mongodb.WriteConcern;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.model.ReplaceOptions;
 import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.UpdateResult;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
@@ -42,6 +45,7 @@ import org.apache.nifi.processor.util.JsonValidator;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StringUtils;
+import org.bson.BsonValue;
 import org.bson.Document;
 import org.bson.types.ObjectId;
 
@@ -49,7 +53,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,12 +61,21 @@ import java.util.Set;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttributes({
+    @WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
description = "The match count from result if update/upsert is performed, 
otherwise not set."),
+    @WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
description = "The modify count from result if update/upsert is performed, 
otherwise not set."),
+    @WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPSERT_ID, description = 
"The '_id' hex value if upsert is performed, otherwise not set.")
+})
 public class PutMongo extends AbstractMongoProcessor {
     static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
             .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
     static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
             .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
 
+    static final String ATTRIBUTE_UPDATE_MATCH_COUNT = 
"mongo.put.update.match.count";
+    static final String ATTRIBUTE_UPDATE_MODIFY_COUNT = 
"mongo.put.update.modify.count";
+    static final String ATTRIBUTE_UPSERT_ID = "mongo.put.upsert.id";
+
     static final String MODE_INSERT = "insert";
     static final String MODE_UPDATE = "update";
 
@@ -82,39 +94,50 @@ public class PutMongo extends AbstractMongoProcessor {
         .description("When true, inserts a document if no document matches the 
update query criteria; this property is valid only when using update mode, "
                 + "otherwise it is ignored")
         .required(true)
+        .dependsOn(MODE, MODE_UPDATE)
         .allowableValues("true", "false")
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
         .defaultValue("false")
         .build();
     static final PropertyDescriptor UPDATE_QUERY_KEY = new 
PropertyDescriptor.Builder()
         .name("Update Query Key")
-        .description("Key name used to build the update query criteria; this 
property is valid only when using update mode, "
-                + "otherwise it is ignored. Example: _id")
+        .description("One or more comma-separated document key names used to 
build the update query criteria, such as _id")
         .required(false)
+        .dependsOn(MODE, MODE_UPDATE)
         .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
     static final PropertyDescriptor UPDATE_QUERY = new 
PropertyDescriptor.Builder()
         .name("putmongo-update-query")
         .displayName("Update Query")
-        .description("Specify a full MongoDB query to be used for the lookup 
query to do an update/upsert.")
+        .description("Specify a full MongoDB query to be used for the lookup 
query to do an update/upsert. NOTE: this field is ignored if the '%s' value is 
not empty."
+            .formatted(UPDATE_QUERY_KEY.getDisplayName()))
         .required(false)
+        .dependsOn(MODE, MODE_UPDATE)
         .addValidator(JsonValidator.INSTANCE)
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor UPDATE_MODE = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor UPDATE_OPERATION_MODE = new 
PropertyDescriptor.Builder()
         .displayName("Update Mode")
         .name("put-mongo-update-mode")
         .required(true)
+        .dependsOn(MODE, MODE_UPDATE)
         .allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
-        .defaultValue(UPDATE_WITH_DOC.getValue())
+        .defaultValue(UPDATE_WITH_DOC)
         .description("Choose an update mode. You can either supply a JSON 
document to use as a direct replacement " +
                 "or specify a document that contains update operators like 
$set, $unset, and $inc. " +
                 "When Operators mode is enabled, the flowfile content is 
expected to be the operator part " +
                 "for example: {$set:{\"key\": 
\"value\"},$inc:{\"count\":1234}} and the update query will come " +
                 "from the configured Update Query property.")
          .build();
+    static final PropertyDescriptor UPDATE_METHOD = new 
PropertyDescriptor.Builder()
+        .name("Update Method")
+        .dependsOn(UPDATE_OPERATION_MODE, UPDATE_WITH_OPERATORS)
+        .description("MongoDB method for running collection update operations, 
such as updateOne or updateMany")
+        .allowableValues(UpdateMethod.class)
+        .defaultValue(UpdateMethod.UPDATE_ONE)
+        .build();
     static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
         .name("Character Set")
         .description("The Character Set in which the data is encoded")
@@ -123,24 +146,20 @@ public class PutMongo extends AbstractMongoProcessor {
         .defaultValue("UTF-8")
         .build();
 
-    private final static Set<Relationship> relationships;
+    private final static Set<Relationship> relationships = Set.of(REL_SUCCESS, 
REL_FAILURE);
+
     private final static List<PropertyDescriptor> propertyDescriptors;
 
     static {
-        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
-        _propertyDescriptors.addAll(descriptors);
+      List<PropertyDescriptor> _propertyDescriptors = new 
ArrayList<>(descriptors);
         _propertyDescriptors.add(MODE);
         _propertyDescriptors.add(UPSERT);
         _propertyDescriptors.add(UPDATE_QUERY_KEY);
         _propertyDescriptors.add(UPDATE_QUERY);
-        _propertyDescriptors.add(UPDATE_MODE);
+        _propertyDescriptors.add(UPDATE_OPERATION_MODE);
+        _propertyDescriptors.add(UPDATE_METHOD);
         _propertyDescriptors.add(CHARACTER_SET);
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
-
-        final Set<Relationship> _relationships = new HashSet<>();
-        _relationships.add(REL_SUCCESS);
-        _relationships.add(REL_FAILURE);
-        relationships = Collections.unmodifiableSet(_relationships);
     }
 
     @Override
@@ -183,7 +202,7 @@ public class PutMongo extends AbstractMongoProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final FlowFile flowFile = session.get();
+        FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
@@ -191,8 +210,8 @@ public class PutMongo extends AbstractMongoProcessor {
         final ComponentLog logger = getLogger();
 
         final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
-        final String mode = context.getProperty(MODE).getValue();
-        final String updateMode = context.getProperty(UPDATE_MODE).getValue();
+        final String processorMode = context.getProperty(MODE).getValue();
+        final String updateOperationMode = 
context.getProperty(UPDATE_OPERATION_MODE).getValue();
         final WriteConcern writeConcern = clientService.getWriteConcern();
 
         try {
@@ -202,10 +221,10 @@ public class PutMongo extends AbstractMongoProcessor {
             session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
 
             // parse
-            final Object doc = (mode.equals(MODE_INSERT) || 
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+            final Object doc = (processorMode.equals(MODE_INSERT) || 
(processorMode.equals(MODE_UPDATE) && 
updateOperationMode.equals(UPDATE_WITH_DOC.getValue())))
                     ? Document.parse(new String(content, charset)) : 
BasicDBObject.parse(new String(content, charset));
 
-            if (MODE_INSERT.equalsIgnoreCase(mode)) {
+            if (MODE_INSERT.equals(processorMode)) {
                 collection.insertOne((Document) doc);
                 logger.info("inserted {} into MongoDB", flowFile);
             } else {
@@ -213,21 +232,39 @@ public class PutMongo extends AbstractMongoProcessor {
                 final boolean upsert = context.getProperty(UPSERT).asBoolean();
                 final String updateKey = 
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                 final String filterQuery = 
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
-                final Document query;
+                final Document updateQuery;
 
-                if (!StringUtils.isBlank(updateKey)) {
-                    query = parseUpdateKey(updateKey, (Map) doc);
+                if (StringUtils.isNotBlank(updateKey)) {
+                    updateQuery = parseUpdateKey(updateKey, (Map) doc);
                     removeUpdateKeys(updateKey, (Map) doc);
                 } else {
-                    query = Document.parse(filterQuery);
+                    updateQuery = Document.parse(filterQuery);
                 }
-
-                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
-                    collection.replaceOne(query, (Document) doc, new 
ReplaceOptions().upsert(upsert));
+                UpdateResult updateResult;
+                if (updateOperationMode.equals(UPDATE_WITH_DOC.getValue())) {
+                    updateResult = collection.replaceOne(updateQuery, 
(Document) doc, new ReplaceOptions().upsert(upsert));
                 } else {
                     BasicDBObject update = (BasicDBObject) doc;
                     update.remove(updateKey);
-                    collection.updateOne(query, update, new 
UpdateOptions().upsert(upsert));
+                    UpdateOptions updateOptions = new 
UpdateOptions().upsert(upsert);
+                    UpdateMethod updateQueryMode = 
context.getProperty(UPDATE_METHOD).asAllowableValue(UpdateMethod.class);
+
+                    if (this.updateModeMatches(UpdateMethod.UPDATE_ONE, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateOne(updateQuery, 
update, updateOptions);
+                    } else if 
(this.updateModeMatches(UpdateMethod.UPDATE_MANY, updateQueryMode, flowFile)) {
+                        updateResult = collection.updateMany(updateQuery, 
update, updateOptions);
+                    } else {
+                        String flowfileUpdateMode = 
flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE);
+                        throw new ProcessException("Unrecognized '" + 
ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
+                    }
+                }
+
+                flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
+                flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
+                BsonValue upsertedId = updateResult.getUpsertedId();
+                if (upsertedId != null) {
+                    String id = upsertedId.isString() ? 
upsertedId.asString().getValue() : 
upsertedId.asObjectId().getValue().toString();
+                    flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPSERT_ID, id);
                 }
                 logger.info("updated {} into MongoDB", flowFile);
             }
diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index 3679d89375..3cbd70f95d 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -30,7 +30,6 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
@@ -54,7 +53,6 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,23 +66,17 @@ import java.util.Set;
         "by the \"Batch Size\" configuration property. This value should be 
set to a reasonable size to ensure " +
         "that MongoDB is not overloaded with too many operations at once.")
 @ReadsAttribute(
-    attribute = PutMongoRecord.MONGODB_UPDATE_MODE,
+    attribute = AbstractMongoProcessor.ATTRIBUTE_MONGODB_UPDATE_MODE,
     description = "Configurable parameter for controlling update mode on a 
per-flowfile basis." +
         " Acceptable values are 'one' and 'many' and controls whether a single 
incoming record should update a single or multiple Mongo documents."
 )
 public class PutMongoRecord extends AbstractMongoProcessor {
-    static final String MONGODB_UPDATE_MODE = "mongodb.update.mode";
 
     static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
             .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
     static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
             .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
 
-    static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update 
One", "Updates only the first document that matches the query.");
-    static final AllowableValue UPDATE_MANY = new AllowableValue("many", 
"Update Many", "Updates every document that matches the query.");
-    static final AllowableValue UPDATE_FF_ATTRIBUTE = new 
AllowableValue("flowfile-attribute", "Use '" + MONGODB_UPDATE_MODE + "' 
flowfile attribute.",
-        "Use the value of the '" + MONGODB_UPDATE_MODE + "' attribute of the 
incoming flowfile. Acceptable values are 'one' and 'many'.");
-
     static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
             .name("record-reader")
             .displayName("Record Reader")
@@ -141,8 +133,8 @@ public class PutMongoRecord extends AbstractMongoProcessor {
         .displayName("Update Mode")
         .dependsOn(UPDATE_KEY_FIELDS)
         .description("Choose between updating a single document or multiple 
documents per incoming record.")
-        .allowableValues(UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE)
-        .defaultValue(UPDATE_ONE.getValue())
+        .allowableValues(UpdateMethod.class)
+        .defaultValue(UpdateMethod.UPDATE_ONE)
         .build();
 
     private final static Set<Relationship> relationships;
@@ -159,10 +151,7 @@ public class PutMongoRecord extends AbstractMongoProcessor 
{
         _propertyDescriptors.add(UPDATE_MODE);
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
-        final Set<Relationship> _relationships = new HashSet<>();
-        _relationships.add(REL_SUCCESS);
-        _relationships.add(REL_FAILURE);
-        relationships = Collections.unmodifiableSet(_relationships);
+        relationships = Set.of(REL_SUCCESS, REL_FAILURE);
     }
 
     @Override
@@ -230,22 +219,22 @@ public class PutMongoRecord extends 
AbstractMongoProcessor {
                 WriteModel<Document> writeModel;
                 if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
                     Bson[] filters = 
buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
-
-                    if (updateModeMatches(UPDATE_ONE.getValue(), context, 
flowFile)) {
+                    UpdateMethod mongoUpdateMode = 
context.getProperty(UPDATE_MODE).asAllowableValue(UpdateMethod.class);
+                    if (this.updateModeMatches(UpdateMethod.UPDATE_ONE, 
mongoUpdateMode, flowFile)) {
                         writeModel = new UpdateOneModel<>(
                             Filters.and(filters),
                             new Document("$set", readyToUpsert),
                             new UpdateOptions().upsert(true)
                         );
-                    } else if (updateModeMatches(UPDATE_MANY.getValue(), 
context, flowFile)) {
+                    } else if 
(this.updateModeMatches(UpdateMethod.UPDATE_MANY, mongoUpdateMode, flowFile)) {
                         writeModel = new UpdateManyModel<>(
                             Filters.and(filters),
                             new Document("$set", readyToUpsert),
                             new UpdateOptions().upsert(true)
                         );
                     } else {
-                        String flowfileUpdateMode = 
flowFile.getAttribute(MONGODB_UPDATE_MODE);
-                        throw new ProcessException("Unrecognized '" + 
MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
+                        String flowfileUpdateMode = 
flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE);
+                        throw new ProcessException("Unrecognized '" + 
ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
                     }
                 } else {
                     writeModel = new InsertOneModel<>(readyToUpsert);
@@ -334,17 +323,4 @@ public class PutMongoRecord extends AbstractMongoProcessor 
{
 
         return filters;
     }
-
-    private boolean updateModeMatches(String updateModeToMatch, ProcessContext 
context, FlowFile flowFile) {
-        String updateMode = context.getProperty(UPDATE_MODE).getValue();
-
-        boolean updateModeMatches = updateMode.equals(updateModeToMatch)
-            ||
-            (
-                updateMode.equals(UPDATE_FF_ATTRIBUTE.getValue())
-                    && 
updateModeToMatch.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE))
-            );
-
-        return updateModeMatches;
-    }
 }
diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
index 413af59c26..d8c2849dbc 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.mongodb;
 import com.mongodb.client.MongoCursor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.mongodb.AbstractMongoProcessor.UpdateMethod;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
@@ -108,62 +109,125 @@ public class PutMongoIT extends MongoWriteTestBase {
     }
 
     @Test
-    public void testBlankUpdateKey() throws Exception {
+    public void testBlankUpdateKeyInUpdateMode() throws Exception {
         TestRunner runner = init(PutMongo.class);
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "  ");
         runner.assertNotValid();
     }
 
     @Test
-    public void testUpdateQuery() throws Exception {
+    public void testUpdateOneWithOperatorByQuery() throws Exception {
         TestRunner runner = init(PutMongo.class);
         Document document = new Document()
             .append("name", "John Smith")
             .append("department", "Engineering");
         collection.insertOne(document);
-        String updateBody = "{\n" +
-            "\t\"$set\": {\n" +
-            "\t\t\"email\": \"[email protected]\",\n" +
-            "\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
-            "\t},\n" +
-            "\t\"$inc\": {\n" +
-            "\t\t\"writes\": 1\n" +
-            "\t}\n" +
-            "}";
+        Document updateBody = new Document(Map.of(
+            "$set", Map.of(
+                "email", "[email protected]",
+                "grade", "Sr. Principle Eng."
+            ),
+            "$inc", Map.of(
+                "writes", 1
+            )
+        ));
         Map<String, String> attr = new HashMap<>();
         attr.put("mongo.update.query", document.toJson());
-        runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
         runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
         runner.setValidateExpressionUsage(true);
-        runner.enqueue(updateBody, attr);
-        updateTests(runner, document);
+        runner.enqueue(updateBody.toJson(), attr);
+        updateOneTests(runner, document);
+    }
+    @Test
+    public void testUpdateManyWithOperatorByQuery() throws Exception {
+        TestRunner runner = init(PutMongo.class);
+        Map<String, String> docKeys = Map.of(
+            "name", "John Smith",
+            "department", "Engineering"
+        );
+
+        collection.insertOne(new Document(docKeys));
+        collection.insertOne(new Document(docKeys));
+        Document updateBody = new Document(Map.of(
+            "$set", Map.of(
+                "email", "[email protected]",
+                "grade", "Sr. Principle Eng."
+            ),
+            "$inc", Map.of(
+                "writes", 1
+            )
+        ));
+        Document search = new Document(docKeys);
+        Map<String, String> attr = new HashMap<>();
+        attr.put("mongo.update.query", search.toJson());
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.UPDATE_METHOD, UpdateMethod.UPDATE_MANY);
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+        runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
+        runner.setValidateExpressionUsage(true);
+        runner.enqueue(updateBody.toJson(), attr);
+        updateManyTests(runner, search, 2);
     }
 
     @Test
-    public void testUpdateBySimpleKey() throws Exception {
+    public void testUpdateOneBySimpleKey() throws Exception {
         TestRunner runner = init(PutMongo.class);
         Document document = new Document()
             .append("name", "John Smith")
             .append("department", "Engineering");
         collection.insertOne(document);
 
-        String updateBody = "{\n" +
-            "\t\"name\": \"John Smith\",\n" +
-            "\t\"$set\": {\n" +
-            "\t\t\"email\": \"[email protected]\",\n" +
-            "\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
-            "\t},\n" +
-            "\t\"$inc\": {\n" +
-            "\t\t\"writes\": 1\n" +
-            "\t}\n" +
-            "}";
+        Document updateBody = new Document(Map.of(
+            "name", "John Smith",
+            "$set", Map.of(
+                "email", "[email protected]",
+                "grade", "Sr. Principle Eng."
+            ),
+            "$inc", Map.of(
+                "writes", 1
+            )
+        ));
+        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+        runner.setValidateExpressionUsage(true);
+        runner.enqueue(updateBody.toJson());
+        updateOneTests(runner, document);
+    }
+
+
+    @Test
+    public void testUpdateManyWithOperatorBySimpleKey() throws Exception {
+        TestRunner runner = init(PutMongo.class);
+        Map<String, String> docKeys = Map.of(
+            "name", "John Smith",
+            "department", "Engineering"
+        );
+
+        collection.insertOne(new Document(docKeys));
+        collection.insertOne(new Document(docKeys));
+
+        Document updateBody = new Document(Map.of(
+            "name", "John Smith",
+            "$set", Map.of(
+                "email", "[email protected]",
+                "grade", "Sr. Principle Eng."
+            ),
+            "$inc", Map.of(
+                "writes", 1
+            )
+        ));
+        Document search = new Document(docKeys);
         runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
-        runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.UPDATE_METHOD, UpdateMethod.UPDATE_MANY);
         runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setValidateExpressionUsage(true);
-        runner.enqueue(updateBody);
-        updateTests(runner, document);
+        runner.enqueue(updateBody.toJson());
+        updateManyTests(runner, search, 2);
     }
 
     @Test
@@ -186,23 +250,28 @@ public class PutMongoIT extends MongoWriteTestBase {
                 .append("name", "John Smith")
                 .append("department", "Engineering");
         collection.insertOne(document);
-        String updateBody = "{\n" +
-                "\t\"name\": \"John Smith\",\n" +
-                "\t\"department\": \"Engineering\",\n" +
-                "\t\"contacts\": {\n" +
-                "\t\t\"phone\": \"555-555-5555\",\n" +
-                "\t\t\"email\": \"[email protected]\",\n" +
-                "\t\t\"twitter\": \"@JohnSmith\"\n" +
-                "\t}\n" +
-                "}";
-        runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC);
+        Document updateBody = new Document(Map.of(
+            "name", "John Smith",
+            "department", "Engineering",
+            "contacts", Map.of(
+                "phone", "555-555-5555",
+                "email", "[email protected]",
+                "twitter", "@JohnSmith"
+            )
+        ));
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_DOC);
         runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setValidateExpressionUsage(true);
-        runner.enqueue(updateBody);
+        runner.enqueue(updateBody.toJson());
         runner.run();
         runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
         runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
 
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+        out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(1));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(1));
+
         MongoCursor<Document> cursor = collection.find(document).iterator();
         Document found = cursor.next();
         assertEquals(found.get("name"), document.get("name"));
@@ -216,7 +285,7 @@ public class PutMongoIT extends MongoWriteTestBase {
     }
 
     @Test
-    public void testUpdateByComplexKey() throws Exception {
+    public void testUpdateOneByComplexKey() throws Exception {
         TestRunner runner = init(PutMongo.class);
         Document document = new Document()
                 .append("name", "John Smith")
@@ -224,25 +293,30 @@ public class PutMongoIT extends MongoWriteTestBase {
                 .append("contacts", new Document().append("email", 
"[email protected]")
                 .append("phone", "555-555-5555"));
         collection.insertOne(document);
-        String updateBody = "{\n" +
-                "\t\"contacts.phone\": \"555-555-5555\",\n" +
-                "\t\"contacts.email\": \"[email protected]\",\n" +
-                "\t\"$set\": {\n" +
-                "\t\t\"contacts.twitter\": \"@JohnSmith\"\n" +
-                "\t},\n" +
-                "\t\"$inc\": {\n" +
-                "\t\t\"writes\": 1\n" +
-                "\t}\n" +
-                "}";
+        Document updateBody = new Document(Map.of(
+            "contacts.phone", "555-555-5555",
+            "contacts.email", "[email protected]",
+            "$set", Map.of(
+                "contacts.twitter", "@JohnSmith"
+            ),
+            "$inc", Map.of(
+                "writes", 1
+            )
+        ));
         runner.setProperty(PutMongo.UPDATE_QUERY_KEY, 
"contacts.phone,contacts.email");
-        runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
         runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setValidateExpressionUsage(true);
-        runner.enqueue(updateBody);
+        runner.enqueue(updateBody.toJson());
         runner.run();
         runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
         runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
 
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+        out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(1));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(1));
+
         MongoCursor<Document> iterator = collection.find(new Document("name", 
"John Smith")).iterator();
         assertTrue(iterator.hasNext(), "Document did not come back.");
         Document val = iterator.next();
@@ -252,17 +326,75 @@ public class PutMongoIT extends MongoWriteTestBase {
         assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
     }
 
-    private void updateTests(TestRunner runner, Document document) {
+    @Test
+    public void testUpdateManyWithOperatorByComplexKey() throws Exception {
+        TestRunner runner = init(PutMongo.class);
+        Map<String, Object> data = Map.of("name", "John Smith",
+            "department", "Engineering",
+            "contacts", Map.of(
+                "email", "[email protected]",
+                "phone", "555-555-5555"
+                )
+        );
+        collection.insertOne(new Document(data));
+        collection.insertOne(new Document(data));
+        Document updateBody = new Document(Map.of(
+            "contacts.phone", "555-555-5555",
+            "contacts.email", "[email protected]",
+            "$set", Map.of(
+                "contacts.twitter", "@JohnSmith"
+            ),
+            "$inc", Map.of(
+                "writes", 1
+            )
+        ));
+        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, 
"contacts.phone,contacts.email");
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.UPDATE_METHOD, UpdateMethod.UPDATE_MANY);
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+        runner.setValidateExpressionUsage(true);
+        runner.enqueue(updateBody.toJson());
         runner.run();
         runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
         runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
 
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+        out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(2));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(2));
+
+        MongoCursor<Document> iterator = collection.find(new Document("name", 
"John Smith")).iterator();
+        for (int i = 1; i <= 2; i++) {
+            assertTrue(iterator.hasNext(), "Document %d did not come 
back.".formatted(i));
+            Document val = iterator.next();
+            Map contacts = (Map) val.get("contacts");
+            assertNotNull(contacts, "Document %d's contacts 
null".formatted(i));
+            assertTrue(contacts.containsKey("twitter") && 
contacts.get("twitter").equals("@JohnSmith"), "Document %d's twitter 
invalid".formatted(i));
+            assertTrue(val.containsKey("writes") && 
val.get("writes").equals(1), "Document %d's writes invalid".formatted(i));
+
+        }
+    }
+
+    private void updateOneTests(TestRunner runner, Document document) {
+        updateManyTests(runner, document, 1);
+    }
+
+    private void updateManyTests(TestRunner runner, Document document, int 
updateCount) {
+        runner.run();
+        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+        runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
         MongoCursor<Document> iterator = collection.find(document).iterator();
-        assertTrue(iterator.hasNext(), "Document did not come back.");
-        Document val = iterator.next();
-        assertTrue(val.containsKey("email") && 
val.get("email").equals("[email protected]"));
-        assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr. 
Principle Eng."));
-        assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
+        MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+        out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(updateCount));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(updateCount));
+        for (int i = 1; i <= updateCount; i++) {
+            assertTrue(iterator.hasNext(), "Document number %s did not come 
back.".formatted(i));
+            Document val = iterator.next();
+            assertTrue(val.containsKey("email") && 
val.get("email").equals("[email protected]"));
+            assertTrue(val.containsKey("grade") && 
val.get("grade").equals("Sr. Principle Eng."));
+            assertTrue(val.containsKey("writes") && 
val.get("writes").equals(1));
+        }
     }
 
     @Test
@@ -341,13 +473,16 @@ public class PutMongoIT extends MongoWriteTestBase {
         Document doc = DOCUMENTS.get(0);
         byte[] bytes = documentToByteArray(doc);
 
-        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.enqueue(bytes);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
         MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
         out.assertContentEquals(bytes);
+        out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(0));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(0));
 
         // nothing was in collection, so nothing to update since upsert 
defaults to false
         assertEquals(0, collection.countDocuments());
@@ -364,7 +499,7 @@ public class PutMongoIT extends MongoWriteTestBase {
         Document doc = DOCUMENTS.get(0);
         byte[] bytes = documentToByteArray(doc);
 
-        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setProperty(PutMongo.UPSERT, "true");
         runner.enqueue(bytes);
         runner.run();
@@ -373,6 +508,10 @@ public class PutMongoIT extends MongoWriteTestBase {
         MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
         out.assertContentEquals(bytes);
 
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPSERT_ID, 
doc.getString("_id"));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(0));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(0));
+
         // verify 1 doc inserted into the collection
         assertEquals(1, collection.countDocuments());
         assertEquals(doc, collection.find().first());
@@ -393,6 +532,10 @@ public class PutMongoIT extends MongoWriteTestBase {
         MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
         out.assertContentEquals(bytes);
 
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPSERT_ID, 
oidDocument.getObjectId("_id").toString());
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(0));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(0));
+
         // verify 1 doc inserted into the collection
         assertEquals(1, collection.countDocuments());
         assertEquals(oidDocument, collection.find().first());
@@ -414,14 +557,16 @@ public class PutMongoIT extends MongoWriteTestBase {
 
         byte[] bytes = documentToByteArray(doc);
 
-        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.enqueue(bytes);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
         MockFlowFile out = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
         out.assertContentEquals(bytes);
-
+        out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, 
String.valueOf(1));
+        out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, 
String.valueOf(1));
         assertEquals(1, collection.countDocuments());
         assertEquals(doc, collection.find().first());
     }
@@ -429,22 +574,35 @@ public class PutMongoIT extends MongoWriteTestBase {
     @Test
     public void testUpsertWithOperators() throws Exception {
         TestRunner runner = init(PutMongo.class);
-        String upsert = "{\n" +
-                "  \"_id\": \"Test\",\n" +
-                "  \"$push\": {\n" +
-                "     \"testArr\": { \"msg\": \"Hi\" }\n" +
-                "  }\n" +
-                "}";
-        runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        Document upsert = new Document(Map.of(
+            "_id", "Test",
+            "$push", Map.of(
+                "testArr", Map.of(
+                    "msg", "Hi"
+                )
+            )
+        ));
+
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
         runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
         runner.setProperty(PutMongo.MODE, "update");
         runner.setProperty(PutMongo.UPSERT, "true");
         for (int x = 0; x < 3; x++) {
-            runner.enqueue(upsert.getBytes());
+            runner.enqueue(upsert.toJson());
         }
         runner.run(3, true, true);
         runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
         runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);
+        List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
+        MockFlowFile upsertOutput = flowFilesForRelationship.removeFirst();
+        upsertOutput.assertAttributeEquals(PutMongo.ATTRIBUTE_UPSERT_ID, 
"Test");
+
+        // test next flow files for update attributes
+        for (int i = 0; i < flowFilesForRelationship.size(); i++) {
+            
flowFilesForRelationship.get(i).assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+            
flowFilesForRelationship.get(i).assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
 String.valueOf(1));
+            
flowFilesForRelationship.get(i).assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
 String.valueOf(1));
+        }
 
         Document query = new Document("_id", "Test");
         Document result = collection.find(query).first();
@@ -454,7 +612,7 @@ public class PutMongoIT extends MongoWriteTestBase {
         for (int index = 0; index < array.size(); index++) {
             Document doc = (Document) array.get(index);
             String msg = doc.getString("msg");
-            assertNotNull("Msg was null", msg);
+            assertNotNull(msg, "Msg was null");
             assertEquals(msg, "Hi", "Msg had wrong value");
         }
     }
@@ -477,26 +635,42 @@ public class PutMongoIT extends MongoWriteTestBase {
     @Test
     public void testNiFi_4759_Regressions() throws Exception {
         TestRunner runner = init(PutMongo.class);
-        String[] upserts = new String[]{
-                "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } 
}",
-                "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": 
\"Hello, world\" } }",
-                "{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello, 
world\" } }"
-        };
+
+        List<Document> upserts = List.of(
+            new Document(Map.of(
+                "_id", "12345",
+                "$set", Map.of(
+                    "msg", "Hello, world"
+                )
+            )),
+            new Document(Map.of(
+                "_id", "5a5617b9c1f5de6d8276e87d",
+                "$set", Map.of(
+                    "msg", "Hello, world"
+                )
+            )),
+            new Document(Map.of(
+                "updateKey", "12345",
+                "$set", Map.of(
+                    "msg", "Hello, world"
+                )
+            ))
+        );
 
         String[] updateKeyProps = new String[] {"_id", "_id", "updateKey"};
         Object[] updateKeys = new Object[] {"12345", new 
ObjectId("5a5617b9c1f5de6d8276e87d"), "12345"};
-        int index = 0;
 
-        runner.setProperty(PutMongo.UPDATE_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
-        runner.setProperty(PutMongo.MODE, "update");
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
         runner.setProperty(PutMongo.UPSERT, "true");
 
         final int LIMIT = 2;
 
-        for (String upsert : upserts) {
+        for (int index = 0; index < upserts.size(); index++) {
+            Document upsert = upserts.get(index);
             runner.setProperty(PutMongo.UPDATE_QUERY_KEY, 
updateKeyProps[index]);
             for (int x = 0; x < LIMIT; x++) {
-                runner.enqueue(upsert);
+                runner.enqueue(upsert.toJson());
             }
             runner.run(LIMIT, true, true);
             runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
@@ -508,7 +682,6 @@ public class PutMongoIT extends MongoWriteTestBase {
             assertNotNull(result, "Result was null");
             assertEquals(1, collection.countDocuments(query), "Count was 
wrong");
             runner.clearTransferState();
-            index++;
         }
     }
 }
diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index e63d89b978..1942807c1d 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -24,6 +24,7 @@ import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.mongodb.MongoDBClientService;
 import org.apache.nifi.mongodb.MongoDBControllerService;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.mongodb.AbstractMongoProcessor.UpdateMethod;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
@@ -423,7 +424,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         TestRunner updateRunner = init();
 
         updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
-        updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, 
PutMongoRecord.UPDATE_MANY.getValue());
+        updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, 
UpdateMethod.UPDATE_MANY.getValue());
 
         recordReader.addSchemaField("team", RecordFieldType.STRING);
         recordReader.addSchemaField("color", RecordFieldType.STRING);
@@ -486,7 +487,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         TestRunner updateRunner = init();
 
         updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
-        updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, 
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+        updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, 
UpdateMethod.UPDATE_FF_ATTRIBUTE.getValue());
 
         recordReader.addSchemaField("team", RecordFieldType.STRING);
         recordReader.addSchemaField("color", RecordFieldType.STRING);
@@ -526,7 +527,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
 
             MockFlowFile flowFile = new MockFlowFile(1);
             flowFile.putAttributes(new HashMap<String, String>() {{
-                put(PutMongoRecord.MONGODB_UPDATE_MODE, "many");
+                put(AbstractMongoProcessor.ATTRIBUTE_MONGODB_UPDATE_MODE, 
"many");
             }});
             updateRunner.enqueue(flowFile);
             updateRunner.run();
@@ -551,7 +552,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         TestRunner runner = init();
 
         runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
-        runner.setProperty(PutMongoRecord.UPDATE_MODE, 
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+        runner.setProperty(PutMongoRecord.UPDATE_MODE, 
UpdateMethod.UPDATE_FF_ATTRIBUTE.getValue());
 
         recordReader.addSchemaField("team", RecordFieldType.STRING);
         recordReader.addSchemaField("color", RecordFieldType.STRING);

Reply via email to