This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 001f353522 NIFI-15184 Enhanced PutMongo to handle id Field values of 
various types correctly (#10497)
001f353522 is described below

commit 001f353522266acca7e873bc4163713e89d7bfd0
Author: Ravinarayan Singh <[email protected]>
AuthorDate: Thu Nov 6 12:49:09 2025 -0800

    NIFI-15184 Enhanced PutMongo to handle id Field values of various types 
correctly (#10497)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/processors/mongodb/PutMongo.java   | 21 ++++++++----
 .../apache/nifi/processors/mongodb/PutMongoIT.java | 38 ++++++++++++++++++++++
 .../nifi/processors/mongodb/PutMongoTest.java      |  1 +
 3 files changed, 54 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index 46cf06216a..20f77955ee 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -265,7 +265,15 @@ public class PutMongo extends AbstractMongoProcessor {
                 flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
                 BsonValue upsertedId = updateResult.getUpsertedId();
                 if (upsertedId != null) {
-                    String id = upsertedId.isString() ? 
upsertedId.asString().getValue() : 
upsertedId.asObjectId().getValue().toString();
+                    final String id;
+                    if (upsertedId.isString()) {
+                        id = upsertedId.asString().getValue();
+                    } else if (upsertedId.isObjectId()) {
+                        id = upsertedId.asObjectId().getValue().toString();
+                    } else {
+                        // Fallback for non-String/ObjectId identifiers (e.g., 
Document, Int32)
+                        id = upsertedId.toString();
+                    }
                     flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPSERT_ID, id);
                 }
                 logger.info("updated {} into MongoDB", flowFile);
@@ -299,12 +307,13 @@ public class PutMongo extends AbstractMongoProcessor {
     private Document parseUpdateKey(String updateKey, Map doc) {
         Document retVal;
         if (updateKey.equals("_id")) {
-            if (doc.get("_id") instanceof ObjectId) {
-                retVal = new Document("_id", doc.get("_id"));
-            } else if (ObjectId.isValid((String) doc.get("_id"))) {
-                retVal = new Document("_id", new ObjectId((String) 
doc.get("_id")));
+            Object idValue = doc.get("_id");
+            if (idValue instanceof ObjectId) {
+                retVal = new Document("_id", idValue);
+            } else if (idValue instanceof String && ObjectId.isValid((String) 
idValue)) {
+                retVal = new Document("_id", new ObjectId((String) idValue));
             } else {
-                retVal = new Document("_id", doc.get("_id"));
+                retVal = new Document("_id", idValue);
             }
         } else if (updateKey.contains(",")) {
             String[] parts = updateKey.split(",[\\s]*");
diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
index 4521ca11e9..b1c6bbc327 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -685,4 +686,41 @@ public class PutMongoIT extends MongoWriteTestBase {
             runner.clearTransferState();
         }
     }
+    @Test
+    public void testUpdateKey_IdVariousTypes() throws Exception {
+        TestRunner runner = init(PutMongo.class);
+
+        runner.setProperty(PutMongo.UPDATE_OPERATION_MODE, 
PutMongo.UPDATE_WITH_OPERATORS);
+        runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+        runner.setProperty(PutMongo.UPSERT, "true");
+        runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
+
+        Document docId = new Document("a", 1);
+        Integer numericId = 42;
+
+        List<Document> updates = List.of(
+                new Document(Map.of("_id", docId, "$set", Map.of("v", 1))),
+                new Document(Map.of("_id", numericId, "$set", Map.of("v", 3)))
+        );
+
+        for (Document update : updates) {
+            runner.enqueue(update.toJson());
+        }
+        runner.run(updates.size(), true, true);
+
+        runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+        runner.assertTransferCount(PutMongo.REL_SUCCESS, updates.size());
+
+        // Verify _id Document preserved
+        Document r1 = collection.find(new Document("_id", docId)).first();
+        assertNotNull(r1);
+        assertInstanceOf(Document.class, r1.get("_id"));
+        assertEquals(docId, r1.get("_id"));
+
+        // Verify _id Number preserved
+        Document r3 = collection.find(new Document("_id", numericId)).first();
+        assertNotNull(r3);
+        assertInstanceOf(Integer.class, r3.get("_id"));
+        assertEquals(numericId, r3.get("_id"));
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
index 2059ac380e..9fada36802 100644
--- 
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
+++ 
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -72,4 +72,5 @@ public class PutMongoTest {
         it = results.iterator();
         assertTrue(it.next().toString().endsWith("Either the update query key 
or the update query field must be set."));
     }
+
 }

Reply via email to