This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 001f353522 NIFI-15184 Enhanced PutMongo to handle id Field values of
various types correctly (#10497)
001f353522 is described below
commit 001f353522266acca7e873bc4163713e89d7bfd0
Author: Ravinarayan Singh <[email protected]>
AuthorDate: Thu Nov 6 12:49:09 2025 -0800
NIFI-15184 Enhanced PutMongo to handle id Field values of various types
correctly (#10497)
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/mongodb/PutMongo.java | 21 ++++++++----
.../apache/nifi/processors/mongodb/PutMongoIT.java | 38 ++++++++++++++++++++++
.../nifi/processors/mongodb/PutMongoTest.java | 1 +
3 files changed, 54 insertions(+), 6 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index 46cf06216a..20f77955ee 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -265,7 +265,15 @@ public class PutMongo extends AbstractMongoProcessor {
flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
BsonValue upsertedId = updateResult.getUpsertedId();
if (upsertedId != null) {
- String id = upsertedId.isString() ?
upsertedId.asString().getValue() :
upsertedId.asObjectId().getValue().toString();
+ final String id;
+ if (upsertedId.isString()) {
+ id = upsertedId.asString().getValue();
+ } else if (upsertedId.isObjectId()) {
+ id = upsertedId.asObjectId().getValue().toString();
+ } else {
+ // Fallback for non-String/ObjectId identifiers (e.g.,
Document, Int32)
+ id = upsertedId.toString();
+ }
flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPSERT_ID, id);
}
logger.info("updated {} into MongoDB", flowFile);
@@ -299,12 +307,13 @@ public class PutMongo extends AbstractMongoProcessor {
private Document parseUpdateKey(String updateKey, Map doc) {
Document retVal;
if (updateKey.equals("_id")) {
- if (doc.get("_id") instanceof ObjectId) {
- retVal = new Document("_id", doc.get("_id"));
- } else if (ObjectId.isValid((String) doc.get("_id"))) {
- retVal = new Document("_id", new ObjectId((String)
doc.get("_id")));
+ Object idValue = doc.get("_id");
+ if (idValue instanceof ObjectId) {
+ retVal = new Document("_id", idValue);
+ } else if (idValue instanceof String && ObjectId.isValid((String)
idValue)) {
+ retVal = new Document("_id", new ObjectId((String) idValue));
} else {
- retVal = new Document("_id", doc.get("_id"));
+ retVal = new Document("_id", idValue);
}
} else if (updateKey.contains(",")) {
String[] parts = updateKey.split(",[\\s]*");
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
index 4521ca11e9..b1c6bbc327 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -685,4 +686,41 @@ public class PutMongoIT extends MongoWriteTestBase {
runner.clearTransferState();
}
}
+ @Test
+ public void testUpdateKey_IdVariousTypes() throws Exception {
+ TestRunner runner = init(PutMongo.class);
+
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+ runner.setProperty(PutMongo.UPSERT, "true");
+ runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
+
+ Document docId = new Document("a", 1);
+ Integer numericId = 42;
+
+ List<Document> updates = List.of(
+ new Document(Map.of("_id", docId, "$set", Map.of("v", 1))),
+ new Document(Map.of("_id", numericId, "$set", Map.of("v", 3)))
+ );
+
+ for (Document update : updates) {
+ runner.enqueue(update.toJson());
+ }
+ runner.run(updates.size(), true, true);
+
+ runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+ runner.assertTransferCount(PutMongo.REL_SUCCESS, updates.size());
+
+ // Verify _id Document preserved
+ Document r1 = collection.find(new Document("_id", docId)).first();
+ assertNotNull(r1);
+ assertInstanceOf(Document.class, r1.get("_id"));
+ assertEquals(docId, r1.get("_id"));
+
+ // Verify _id Number preserved
+ Document r3 = collection.find(new Document("_id", numericId)).first();
+ assertNotNull(r3);
+ assertInstanceOf(Integer.class, r3.get("_id"));
+ assertEquals(numericId, r3.get("_id"));
+ }
}
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
index 2059ac380e..9fada36802 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoTest.java
@@ -72,4 +72,5 @@ public class PutMongoTest {
it = results.iterator();
assertTrue(it.next().toString().endsWith("Either the update query key
or the update query field must be set."));
}
+
}