This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c5ed5c51e9 NIFI-12224 Added Support for updateMany Method in PutMongo
c5ed5c51e9 is described below
commit c5ed5c51e93f5b1f9311b5a478e64b2f1320a9d4
Author: Umar Hussain <[email protected]>
AuthorDate: Sun Apr 7 02:47:03 2024 +0500
NIFI-12224 Added Support for updateMany Method in PutMongo
This closes #8610
Signed-off-by: David Handermann <[email protected]>
---
.../processors/mongodb/AbstractMongoProcessor.java | 62 +++-
.../apache/nifi/processors/mongodb/PutMongo.java | 93 ++++--
.../nifi/processors/mongodb/PutMongoRecord.java | 42 +--
.../apache/nifi/processors/mongodb/PutMongoIT.java | 337 ++++++++++++++++-----
.../nifi/processors/mongodb/PutMongoRecordIT.java | 9 +-
5 files changed, 387 insertions(+), 156 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 9a5c8eb724..5d7b4d6843 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -46,13 +47,13 @@ import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
public abstract class AbstractMongoProcessor extends AbstractProcessor {
+ public static final String ATTRIBUTE_MONGODB_UPDATE_MODE =
"mongodb.update.mode";
+
protected static final String JSON_TYPE_EXTENDED = "Extended";
protected static final String JSON_TYPE_STANDARD = "Standard";
protected static final AllowableValue JSON_EXTENDED = new
AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
@@ -156,14 +157,41 @@ public abstract class AbstractMongoProcessor extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final List<PropertyDescriptor> descriptors;
+ static final List<PropertyDescriptor> descriptors = List.of(
+ CLIENT_SERVICE,
+ DATABASE_NAME,
+ COLLECTION_NAME
+ );
+
+ public enum UpdateMethod implements DescribedValue {
+ UPDATE_ONE("one", "Update One", "Updates only the first document that
matches the query."),
+ UPDATE_MANY("many", "Update Many", "Updates every document that
matches the query."),
+ UPDATE_FF_ATTRIBUTE("flowfile-attribute", "Use '" +
ATTRIBUTE_MONGODB_UPDATE_MODE + "' FlowFile attribute.",
+ "Use the value of the '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "'
attribute of the incoming FlowFile. Acceptable values are 'one' and 'many'.");
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ UpdateMethod(final String value, final String displayName, final
String description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
- static {
- List<PropertyDescriptor> _temp = new ArrayList<>();
- _temp.add(CLIENT_SERVICE);
- _temp.add(DATABASE_NAME);
- _temp.add(COLLECTION_NAME);
- descriptors = Collections.unmodifiableList(_temp);
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
}
protected ObjectMapper objectMapper;
@@ -235,4 +263,20 @@ public abstract class AbstractMongoProcessor extends
AbstractProcessor {
objectMapper.setDateFormat(df);
}
}
+
+ /**
+ * Checks if given update mode option matches for the incoming flow file
+ * @param updateMethodToMatch the value against which processor's mode is
compared
+ * @param configuredUpdateMethod the value coming from running processor
+ * @param flowFile incoming flow file to extract processor mode
+ * @return true if the incoming files update mode matches with
updateMethodToMatch
+ */
+ protected boolean updateModeMatches(
+ UpdateMethod updateMethodToMatch, UpdateMethod configuredUpdateMethod,
FlowFile flowFile) {
+
+ return updateMethodToMatch == configuredUpdateMethod
+ || (UpdateMethod.UPDATE_FF_ATTRIBUTE == configuredUpdateMethod
+ &&
updateMethodToMatch.getValue().equalsIgnoreCase(flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE)));
+ }
+
}
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
index 526c041742..e6e8d23c14 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
@@ -21,10 +21,13 @@ import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.UpdateResult;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
@@ -42,6 +45,7 @@ import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
+import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.ObjectId;
@@ -49,7 +53,6 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,12 +61,21 @@ import java.util.Set;
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttributes({
+ @WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
description = "The match count from result if update/upsert is performed,
otherwise not set."),
+ @WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
description = "The modify count from result if update/upsert is performed,
otherwise not set."),
+ @WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPSERT_ID, description =
"The '_id' hex value if upsert is performed, otherwise not set.")
+})
public class PutMongo extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed
to this relationship").build();
static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are
routed to this relationship").build();
+ static final String ATTRIBUTE_UPDATE_MATCH_COUNT =
"mongo.put.update.match.count";
+ static final String ATTRIBUTE_UPDATE_MODIFY_COUNT =
"mongo.put.update.modify.count";
+ static final String ATTRIBUTE_UPSERT_ID = "mongo.put.upsert.id";
+
static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";
@@ -82,39 +94,50 @@ public class PutMongo extends AbstractMongoProcessor {
.description("When true, inserts a document if no document matches the
update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
+ .dependsOn(MODE, MODE_UPDATE)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
static final PropertyDescriptor UPDATE_QUERY_KEY = new
PropertyDescriptor.Builder()
.name("Update Query Key")
- .description("Key name used to build the update query criteria; this
property is valid only when using update mode, "
- + "otherwise it is ignored. Example: _id")
+ .description("One or more comma-separated document key names used to
build the update query criteria, such as _id")
.required(false)
+ .dependsOn(MODE, MODE_UPDATE)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor UPDATE_QUERY = new
PropertyDescriptor.Builder()
.name("putmongo-update-query")
.displayName("Update Query")
- .description("Specify a full MongoDB query to be used for the lookup
query to do an update/upsert.")
+ .description("Specify a full MongoDB query to be used for the lookup
query to do an update/upsert. NOTE: this field is ignored if the '%s' value is
not empty."
+ .formatted(UPDATE_QUERY_KEY.getDisplayName()))
.required(false)
+ .dependsOn(MODE, MODE_UPDATE)
.addValidator(JsonValidator.INSTANCE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor UPDATE_MODE = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor UPDATE_OPERATION_MODE = new
PropertyDescriptor.Builder()
.displayName("Update Mode")
.name("put-mongo-update-mode")
.required(true)
+ .dependsOn(MODE, MODE_UPDATE)
.allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
- .defaultValue(UPDATE_WITH_DOC.getValue())
+ .defaultValue(UPDATE_WITH_DOC)
.description("Choose an update mode. You can either supply a JSON
document to use as a direct replacement " +
"or specify a document that contains update operators like
$set, $unset, and $inc. " +
"When Operators mode is enabled, the flowfile content is
expected to be the operator part " +
"for example: {$set:{\"key\":
\"value\"},$inc:{\"count\":1234}} and the update query will come " +
"from the configured Update Query property.")
.build();
+ static final PropertyDescriptor UPDATE_METHOD = new
PropertyDescriptor.Builder()
+ .name("Update Method")
+ .dependsOn(UPDATE_OPERATION_MODE, UPDATE_WITH_OPERATORS)
+ .description("MongoDB method for running collection update operations,
such as updateOne or updateMany")
+ .allowableValues(UpdateMethod.class)
+ .defaultValue(UpdateMethod.UPDATE_ONE)
+ .build();
static final PropertyDescriptor CHARACTER_SET = new
PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
@@ -123,24 +146,20 @@ public class PutMongo extends AbstractMongoProcessor {
.defaultValue("UTF-8")
.build();
- private final static Set<Relationship> relationships;
+ private final static Set<Relationship> relationships = Set.of(REL_SUCCESS,
REL_FAILURE);
+
private final static List<PropertyDescriptor> propertyDescriptors;
static {
- List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
- _propertyDescriptors.addAll(descriptors);
+ List<PropertyDescriptor> _propertyDescriptors = new
ArrayList<>(descriptors);
_propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY);
_propertyDescriptors.add(UPDATE_QUERY);
- _propertyDescriptors.add(UPDATE_MODE);
+ _propertyDescriptors.add(UPDATE_OPERATION_MODE);
+ _propertyDescriptors.add(UPDATE_METHOD);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
-
- final Set<Relationship> _relationships = new HashSet<>();
- _relationships.add(REL_SUCCESS);
- _relationships.add(REL_FAILURE);
- relationships = Collections.unmodifiableSet(_relationships);
}
@Override
@@ -183,7 +202,7 @@ public class PutMongo extends AbstractMongoProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final FlowFile flowFile = session.get();
+ FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
@@ -191,8 +210,8 @@ public class PutMongo extends AbstractMongoProcessor {
final ComponentLog logger = getLogger();
final Charset charset =
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
- final String mode = context.getProperty(MODE).getValue();
- final String updateMode = context.getProperty(UPDATE_MODE).getValue();
+ final String processorMode = context.getProperty(MODE).getValue();
+ final String updateOperationMode =
context.getProperty(UPDATE_OPERATION_MODE).getValue();
final WriteConcern writeConcern = clientService.getWriteConcern();
try {
@@ -202,10 +221,10 @@ public class PutMongo extends AbstractMongoProcessor {
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content,
true));
// parse
- final Object doc = (mode.equals(MODE_INSERT) ||
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+ final Object doc = (processorMode.equals(MODE_INSERT) ||
(processorMode.equals(MODE_UPDATE) &&
updateOperationMode.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) :
BasicDBObject.parse(new String(content, charset));
- if (MODE_INSERT.equalsIgnoreCase(mode)) {
+ if (MODE_INSERT.equals(processorMode)) {
collection.insertOne((Document) doc);
logger.info("inserted {} into MongoDB", flowFile);
} else {
@@ -213,21 +232,39 @@ public class PutMongo extends AbstractMongoProcessor {
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey =
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery =
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
- final Document query;
+ final Document updateQuery;
- if (!StringUtils.isBlank(updateKey)) {
- query = parseUpdateKey(updateKey, (Map) doc);
+ if (StringUtils.isNotBlank(updateKey)) {
+ updateQuery = parseUpdateKey(updateKey, (Map) doc);
removeUpdateKeys(updateKey, (Map) doc);
} else {
- query = Document.parse(filterQuery);
+ updateQuery = Document.parse(filterQuery);
}
-
- if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
- collection.replaceOne(query, (Document) doc, new
ReplaceOptions().upsert(upsert));
+ UpdateResult updateResult;
+ if (updateOperationMode.equals(UPDATE_WITH_DOC.getValue())) {
+ updateResult = collection.replaceOne(updateQuery,
(Document) doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject) doc;
update.remove(updateKey);
- collection.updateOne(query, update, new
UpdateOptions().upsert(upsert));
+ UpdateOptions updateOptions = new
UpdateOptions().upsert(upsert);
+ UpdateMethod updateQueryMode =
context.getProperty(UPDATE_METHOD).asAllowableValue(UpdateMethod.class);
+
+ if (this.updateModeMatches(UpdateMethod.UPDATE_ONE,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateOne(updateQuery,
update, updateOptions);
+ } else if
(this.updateModeMatches(UpdateMethod.UPDATE_MANY, updateQueryMode, flowFile)) {
+ updateResult = collection.updateMany(updateQuery,
update, updateOptions);
+ } else {
+ String flowfileUpdateMode =
flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE);
+ throw new ProcessException("Unrecognized '" +
ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
+ }
+ }
+
+ flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
+ flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
+ BsonValue upsertedId = updateResult.getUpsertedId();
+ if (upsertedId != null) {
+ String id = upsertedId.isString() ?
upsertedId.asString().getValue() :
upsertedId.asObjectId().getValue().toString();
+ flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPSERT_ID, id);
}
logger.info("updated {} into MongoDB", flowFile);
}
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index 3679d89375..3cbd70f95d 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -30,7 +30,6 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@@ -54,7 +53,6 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -68,23 +66,17 @@ import java.util.Set;
"by the \"Batch Size\" configuration property. This value should be
set to a reasonable size to ensure " +
"that MongoDB is not overloaded with too many operations at once.")
@ReadsAttribute(
- attribute = PutMongoRecord.MONGODB_UPDATE_MODE,
+ attribute = AbstractMongoProcessor.ATTRIBUTE_MONGODB_UPDATE_MODE,
description = "Configurable parameter for controlling update mode on a
per-flowfile basis." +
" Acceptable values are 'one' and 'many' and controls whether a single
incoming record should update a single or multiple Mongo documents."
)
public class PutMongoRecord extends AbstractMongoProcessor {
- static final String MONGODB_UPDATE_MODE = "mongodb.update.mode";
static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed
to this relationship").build();
static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are
routed to this relationship").build();
- static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update
One", "Updates only the first document that matches the query.");
- static final AllowableValue UPDATE_MANY = new AllowableValue("many",
"Update Many", "Updates every document that matches the query.");
- static final AllowableValue UPDATE_FF_ATTRIBUTE = new
AllowableValue("flowfile-attribute", "Use '" + MONGODB_UPDATE_MODE + "'
flowfile attribute.",
- "Use the value of the '" + MONGODB_UPDATE_MODE + "' attribute of the
incoming flowfile. Acceptable values are 'one' and 'many'.");
-
static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
@@ -141,8 +133,8 @@ public class PutMongoRecord extends AbstractMongoProcessor {
.displayName("Update Mode")
.dependsOn(UPDATE_KEY_FIELDS)
.description("Choose between updating a single document or multiple
documents per incoming record.")
- .allowableValues(UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE)
- .defaultValue(UPDATE_ONE.getValue())
+ .allowableValues(UpdateMethod.class)
+ .defaultValue(UpdateMethod.UPDATE_ONE)
.build();
private final static Set<Relationship> relationships;
@@ -159,10 +151,7 @@ public class PutMongoRecord extends AbstractMongoProcessor
{
_propertyDescriptors.add(UPDATE_MODE);
propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
- final Set<Relationship> _relationships = new HashSet<>();
- _relationships.add(REL_SUCCESS);
- _relationships.add(REL_FAILURE);
- relationships = Collections.unmodifiableSet(_relationships);
+ relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
@Override
@@ -230,22 +219,22 @@ public class PutMongoRecord extends
AbstractMongoProcessor {
WriteModel<Document> writeModel;
if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
Bson[] filters =
buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
-
- if (updateModeMatches(UPDATE_ONE.getValue(), context,
flowFile)) {
+ UpdateMethod mongoUpdateMode =
context.getProperty(UPDATE_MODE).asAllowableValue(UpdateMethod.class);
+ if (this.updateModeMatches(UpdateMethod.UPDATE_ONE,
mongoUpdateMode, flowFile)) {
writeModel = new UpdateOneModel<>(
Filters.and(filters),
new Document("$set", readyToUpsert),
new UpdateOptions().upsert(true)
);
- } else if (updateModeMatches(UPDATE_MANY.getValue(),
context, flowFile)) {
+ } else if
(this.updateModeMatches(UpdateMethod.UPDATE_MANY, mongoUpdateMode, flowFile)) {
writeModel = new UpdateManyModel<>(
Filters.and(filters),
new Document("$set", readyToUpsert),
new UpdateOptions().upsert(true)
);
} else {
- String flowfileUpdateMode =
flowFile.getAttribute(MONGODB_UPDATE_MODE);
- throw new ProcessException("Unrecognized '" +
MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
+ String flowfileUpdateMode =
flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE);
+ throw new ProcessException("Unrecognized '" +
ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
}
} else {
writeModel = new InsertOneModel<>(readyToUpsert);
@@ -334,17 +323,4 @@ public class PutMongoRecord extends AbstractMongoProcessor
{
return filters;
}
-
- private boolean updateModeMatches(String updateModeToMatch, ProcessContext
context, FlowFile flowFile) {
- String updateMode = context.getProperty(UPDATE_MODE).getValue();
-
- boolean updateModeMatches = updateMode.equals(updateModeToMatch)
- ||
- (
- updateMode.equals(UPDATE_FF_ATTRIBUTE.getValue())
- &&
updateModeToMatch.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE))
- );
-
- return updateModeMatches;
- }
}
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
index 413af59c26..d8c2849dbc 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoIT.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.mongodb;
import com.mongodb.client.MongoCursor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.mongodb.AbstractMongoProcessor.UpdateMethod;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
@@ -108,62 +109,125 @@ public class PutMongoIT extends MongoWriteTestBase {
}
@Test
- public void testBlankUpdateKey() throws Exception {
+ public void testBlankUpdateKeyInUpdateMode() throws Exception {
TestRunner runner = init(PutMongo.class);
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " ");
runner.assertNotValid();
}
@Test
- public void testUpdateQuery() throws Exception {
+ public void testUpdateOneWithOperatorByQuery() throws Exception {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
- String updateBody = "{\n" +
- "\t\"$set\": {\n" +
- "\t\t\"email\": \"[email protected]\",\n" +
- "\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
- "\t},\n" +
- "\t\"$inc\": {\n" +
- "\t\t\"writes\": 1\n" +
- "\t}\n" +
- "}";
+ Document updateBody = new Document(Map.of(
+ "$set", Map.of(
+ "email", "[email protected]",
+ "grade", "Sr. Principle Eng."
+ ),
+ "$inc", Map.of(
+ "writes", 1
+ )
+ ));
Map<String, String> attr = new HashMap<>();
attr.put("mongo.update.query", document.toJson());
- runner.setProperty(PutMongo.UPDATE_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
runner.setValidateExpressionUsage(true);
- runner.enqueue(updateBody, attr);
- updateTests(runner, document);
+ runner.enqueue(updateBody.toJson(), attr);
+ updateOneTests(runner, document);
+ }
+ @Test
+ public void testUpdateManyWithOperatorByQuery() throws Exception {
+ TestRunner runner = init(PutMongo.class);
+ Map<String, String> docKeys = Map.of(
+ "name", "John Smith",
+ "department", "Engineering"
+ );
+
+ collection.insertOne(new Document(docKeys));
+ collection.insertOne(new Document(docKeys));
+ Document updateBody = new Document(Map.of(
+ "$set", Map.of(
+ "email", "[email protected]",
+ "grade", "Sr. Principle Eng."
+ ),
+ "$inc", Map.of(
+ "writes", 1
+ )
+ ));
+ Document search = new Document(docKeys);
+ Map<String, String> attr = new HashMap<>();
+ attr.put("mongo.update.query", search.toJson());
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.UPDATE_METHOD, UpdateMethod.UPDATE_MANY);
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+ runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
+ runner.setValidateExpressionUsage(true);
+ runner.enqueue(updateBody.toJson(), attr);
+ updateManyTests(runner, search, 2);
}
@Test
- public void testUpdateBySimpleKey() throws Exception {
+ public void testUpdateOneBySimpleKey() throws Exception {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
- String updateBody = "{\n" +
- "\t\"name\": \"John Smith\",\n" +
- "\t\"$set\": {\n" +
- "\t\t\"email\": \"[email protected]\",\n" +
- "\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
- "\t},\n" +
- "\t\"$inc\": {\n" +
- "\t\t\"writes\": 1\n" +
- "\t}\n" +
- "}";
+ Document updateBody = new Document(Map.of(
+ "name", "John Smith",
+ "$set", Map.of(
+ "email", "[email protected]",
+ "grade", "Sr. Principle Eng."
+ ),
+ "$inc", Map.of(
+ "writes", 1
+ )
+ ));
+ runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+ runner.setValidateExpressionUsage(true);
+ runner.enqueue(updateBody.toJson());
+ updateOneTests(runner, document);
+ }
+
+
+ @Test
+ public void testUpdateManyWithOperatorBySimpleKey() throws Exception {
+ TestRunner runner = init(PutMongo.class);
+ Map<String, String> docKeys = Map.of(
+ "name", "John Smith",
+ "department", "Engineering"
+ );
+
+ collection.insertOne(new Document(docKeys));
+ collection.insertOne(new Document(docKeys));
+
+ Document updateBody = new Document(Map.of(
+ "name", "John Smith",
+ "$set", Map.of(
+ "email", "[email protected]",
+ "grade", "Sr. Principle Eng."
+ ),
+ "$inc", Map.of(
+ "writes", 1
+ )
+ ));
+ Document search = new Document(docKeys);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
- runner.setProperty(PutMongo.UPDATE_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.UPDATE_METHOD, UpdateMethod.UPDATE_MANY);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
- runner.enqueue(updateBody);
- updateTests(runner, document);
+ runner.enqueue(updateBody.toJson());
+ updateManyTests(runner, search, 2);
}
@Test
@@ -186,23 +250,28 @@ public class PutMongoIT extends MongoWriteTestBase {
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
- String updateBody = "{\n" +
- "\t\"name\": \"John Smith\",\n" +
- "\t\"department\": \"Engineering\",\n" +
- "\t\"contacts\": {\n" +
- "\t\t\"phone\": \"555-555-5555\",\n" +
- "\t\t\"email\": \"[email protected]\",\n" +
- "\t\t\"twitter\": \"@JohnSmith\"\n" +
- "\t}\n" +
- "}";
- runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC);
+ Document updateBody = new Document(Map.of(
+ "name", "John Smith",
+ "department", "Engineering",
+ "contacts", Map.of(
+ "phone", "555-555-5555",
+ "email", "[email protected]",
+ "twitter", "@JohnSmith"
+ )
+ ));
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_DOC);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
- runner.enqueue(updateBody);
+ runner.enqueue(updateBody.toJson());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
+ MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+ out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(1));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(1));
+
MongoCursor<Document> cursor = collection.find(document).iterator();
Document found = cursor.next();
assertEquals(found.get("name"), document.get("name"));
@@ -216,7 +285,7 @@ public class PutMongoIT extends MongoWriteTestBase {
}
@Test
- public void testUpdateByComplexKey() throws Exception {
+ public void testUpdateOneByComplexKey() throws Exception {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
@@ -224,25 +293,30 @@ public class PutMongoIT extends MongoWriteTestBase {
.append("contacts", new Document().append("email",
"[email protected]")
.append("phone", "555-555-5555"));
collection.insertOne(document);
- String updateBody = "{\n" +
- "\t\"contacts.phone\": \"555-555-5555\",\n" +
- "\t\"contacts.email\": \"[email protected]\",\n" +
- "\t\"$set\": {\n" +
- "\t\t\"contacts.twitter\": \"@JohnSmith\"\n" +
- "\t},\n" +
- "\t\"$inc\": {\n" +
- "\t\t\"writes\": 1\n" +
- "\t}\n" +
- "}";
+ Document updateBody = new Document(Map.of(
+ "contacts.phone", "555-555-5555",
+ "contacts.email", "[email protected]",
+ "$set", Map.of(
+ "contacts.twitter", "@JohnSmith"
+ ),
+ "$inc", Map.of(
+ "writes", 1
+ )
+ ));
runner.setProperty(PutMongo.UPDATE_QUERY_KEY,
"contacts.phone,contacts.email");
- runner.setProperty(PutMongo.UPDATE_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
- runner.enqueue(updateBody);
+ runner.enqueue(updateBody.toJson());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
+ MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+ out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(1));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(1));
+
MongoCursor<Document> iterator = collection.find(new Document("name",
"John Smith")).iterator();
assertTrue(iterator.hasNext(), "Document did not come back.");
Document val = iterator.next();
@@ -252,17 +326,75 @@ public class PutMongoIT extends MongoWriteTestBase {
assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
}
- private void updateTests(TestRunner runner, Document document) {
+ @Test
+ public void testUpdateManyWithOperatorByComplexKey() throws Exception {
+ TestRunner runner = init(PutMongo.class);
+ Map<String, Object> data = Map.of("name", "John Smith",
+ "department", "Engineering",
+ "contacts", Map.of(
+ "email", "[email protected]",
+ "phone", "555-555-5555"
+ )
+ );
+ collection.insertOne(new Document(data));
+ collection.insertOne(new Document(data));
+ Document updateBody = new Document(Map.of(
+ "contacts.phone", "555-555-5555",
+ "contacts.email", "[email protected]",
+ "$set", Map.of(
+ "contacts.twitter", "@JohnSmith"
+ ),
+ "$inc", Map.of(
+ "writes", 1
+ )
+ ));
+ runner.setProperty(PutMongo.UPDATE_QUERY_KEY,
"contacts.phone,contacts.email");
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.UPDATE_METHOD, UpdateMethod.UPDATE_MANY);
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
+ runner.setValidateExpressionUsage(true);
+ runner.enqueue(updateBody.toJson());
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
+ MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+ out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(2));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(2));
+
+ MongoCursor<Document> iterator = collection.find(new Document("name",
"John Smith")).iterator();
+ for (int i = 1; i <= 2; i++) {
+ assertTrue(iterator.hasNext(), "Document %d did not come
back.".formatted(i));
+ Document val = iterator.next();
+ Map contacts = (Map) val.get("contacts");
+ assertNotNull(contacts, "Document %d's contacts
null".formatted(i));
+ assertTrue(contacts.containsKey("twitter") &&
contacts.get("twitter").equals("@JohnSmith"), "Document %d's twitter
invalid".formatted(i));
+ assertTrue(val.containsKey("writes") &&
val.get("writes").equals(1), "Document %d's writes invalid".formatted(i));
+
+ }
+ }
+
+ private void updateOneTests(TestRunner runner, Document document) {
+ updateManyTests(runner, document, 1);
+ }
+
+ private void updateManyTests(TestRunner runner, Document document, int
updateCount) {
+ runner.run();
+ runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
+ runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> iterator = collection.find(document).iterator();
- assertTrue(iterator.hasNext(), "Document did not come back.");
- Document val = iterator.next();
- assertTrue(val.containsKey("email") &&
val.get("email").equals("[email protected]"));
- assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr.
Principle Eng."));
- assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
+ MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).getFirst();
+ out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(updateCount));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(updateCount));
+ for (int i = 1; i <= updateCount; i++) {
+ assertTrue(iterator.hasNext(), "Document number %s did not come
back.".formatted(i));
+ Document val = iterator.next();
+ assertTrue(val.containsKey("email") &&
val.get("email").equals("[email protected]"));
+ assertTrue(val.containsKey("grade") &&
val.get("grade").equals("Sr. Principle Eng."));
+ assertTrue(val.containsKey("writes") &&
val.get("writes").equals(1));
+ }
}
@Test
@@ -341,13 +473,16 @@ public class PutMongoIT extends MongoWriteTestBase {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
- runner.setProperty(PutMongo.MODE, "update");
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
+ out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(0));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(0));
// nothing was in collection, so nothing to update since upsert
defaults to false
assertEquals(0, collection.countDocuments());
@@ -364,7 +499,7 @@ public class PutMongoIT extends MongoWriteTestBase {
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
- runner.setProperty(PutMongo.MODE, "update");
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPSERT, "true");
runner.enqueue(bytes);
runner.run();
@@ -373,6 +508,10 @@ public class PutMongoIT extends MongoWriteTestBase {
MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPSERT_ID,
doc.getString("_id"));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(0));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(0));
+
// verify 1 doc inserted into the collection
assertEquals(1, collection.countDocuments());
assertEquals(doc, collection.find().first());
@@ -393,6 +532,10 @@ public class PutMongoIT extends MongoWriteTestBase {
MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPSERT_ID,
oidDocument.getObjectId("_id").toString());
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(0));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(0));
+
// verify 1 doc inserted into the collection
assertEquals(1, collection.countDocuments());
assertEquals(oidDocument, collection.find().first());
@@ -414,14 +557,16 @@ public class PutMongoIT extends MongoWriteTestBase {
byte[] bytes = documentToByteArray(doc);
- runner.setProperty(PutMongo.MODE, "update");
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
-
+ out.assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(1));
+ out.assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(1));
assertEquals(1, collection.countDocuments());
assertEquals(doc, collection.find().first());
}
@@ -429,22 +574,35 @@ public class PutMongoIT extends MongoWriteTestBase {
@Test
public void testUpsertWithOperators() throws Exception {
TestRunner runner = init(PutMongo.class);
- String upsert = "{\n" +
- " \"_id\": \"Test\",\n" +
- " \"$push\": {\n" +
- " \"testArr\": { \"msg\": \"Hi\" }\n" +
- " }\n" +
- "}";
- runner.setProperty(PutMongo.UPDATE_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ Document upsert = new Document(Map.of(
+ "_id", "Test",
+ "$push", Map.of(
+ "testArr", Map.of(
+ "msg", "Hi"
+ )
+ )
+ ));
+
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
for (int x = 0; x < 3; x++) {
- runner.enqueue(upsert.getBytes());
+ runner.enqueue(upsert.toJson());
}
runner.run(3, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);
+ List<MockFlowFile> flowFilesForRelationship =
runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
+ MockFlowFile upsertOutput = flowFilesForRelationship.removeFirst();
+ upsertOutput.assertAttributeEquals(PutMongo.ATTRIBUTE_UPSERT_ID,
"Test");
+
+ // test next flow files for update attributes
+ for (int i = 0; i < flowFilesForRelationship.size(); i++) {
+
flowFilesForRelationship.get(i).assertAttributeNotExists(PutMongo.ATTRIBUTE_UPSERT_ID);
+
flowFilesForRelationship.get(i).assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT,
String.valueOf(1));
+
flowFilesForRelationship.get(i).assertAttributeEquals(PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT,
String.valueOf(1));
+ }
Document query = new Document("_id", "Test");
Document result = collection.find(query).first();
@@ -454,7 +612,7 @@ public class PutMongoIT extends MongoWriteTestBase {
for (int index = 0; index < array.size(); index++) {
Document doc = (Document) array.get(index);
String msg = doc.getString("msg");
- assertNotNull("Msg was null", msg);
+ assertNotNull(msg, "Msg was null");
assertEquals(msg, "Hi", "Msg had wrong value");
}
}
@@ -477,26 +635,42 @@ public class PutMongoIT extends MongoWriteTestBase {
@Test
public void testNiFi_4759_Regressions() throws Exception {
TestRunner runner = init(PutMongo.class);
- String[] upserts = new String[]{
- "{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" }
}",
- "{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\":
\"Hello, world\" } }",
- "{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello,
world\" } }"
- };
+
+ List<Document> upserts = List.of(
+ new Document(Map.of(
+ "_id", "12345",
+ "$set", Map.of(
+ "msg", "Hello, world"
+ )
+ )),
+ new Document(Map.of(
+ "_id", "5a5617b9c1f5de6d8276e87d",
+ "$set", Map.of(
+ "msg", "Hello, world"
+ )
+ )),
+ new Document(Map.of(
+ "updateKey", "12345",
+ "$set", Map.of(
+ "msg", "Hello, world"
+ )
+ ))
+ );
String[] updateKeyProps = new String[] {"_id", "_id", "updateKey"};
Object[] updateKeys = new Object[] {"12345", new
ObjectId("5a5617b9c1f5de6d8276e87d"), "12345"};
- int index = 0;
- runner.setProperty(PutMongo.UPDATE_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
- runner.setProperty(PutMongo.MODE, "update");
+ runner.setProperty(PutMongo.UPDATE_OPERATION_MODE,
PutMongo.UPDATE_WITH_OPERATORS);
+ runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPSERT, "true");
final int LIMIT = 2;
- for (String upsert : upserts) {
+ for (int index = 0; index < upserts.size(); index++) {
+ Document upsert = upserts.get(index);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY,
updateKeyProps[index]);
for (int x = 0; x < LIMIT; x++) {
- runner.enqueue(upsert);
+ runner.enqueue(upsert.toJson());
}
runner.run(LIMIT, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
@@ -508,7 +682,6 @@ public class PutMongoIT extends MongoWriteTestBase {
assertNotNull(result, "Result was null");
assertEquals(1, collection.countDocuments(query), "Count was
wrong");
runner.clearTransferState();
- index++;
}
}
}
diff --git
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index e63d89b978..1942807c1d 100644
---
a/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++
b/nifi-extension-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -24,6 +24,7 @@ import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.mongodb.AbstractMongoProcessor.UpdateMethod;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
@@ -423,7 +424,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
TestRunner updateRunner = init();
updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
- updateRunner.setProperty(PutMongoRecord.UPDATE_MODE,
PutMongoRecord.UPDATE_MANY.getValue());
+ updateRunner.setProperty(PutMongoRecord.UPDATE_MODE,
UpdateMethod.UPDATE_MANY.getValue());
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
@@ -486,7 +487,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
TestRunner updateRunner = init();
updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
- updateRunner.setProperty(PutMongoRecord.UPDATE_MODE,
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+ updateRunner.setProperty(PutMongoRecord.UPDATE_MODE,
UpdateMethod.UPDATE_FF_ATTRIBUTE.getValue());
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);
@@ -526,7 +527,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
MockFlowFile flowFile = new MockFlowFile(1);
flowFile.putAttributes(new HashMap<String, String>() {{
- put(PutMongoRecord.MONGODB_UPDATE_MODE, "many");
+ put(AbstractMongoProcessor.ATTRIBUTE_MONGODB_UPDATE_MODE,
"many");
}});
updateRunner.enqueue(flowFile);
updateRunner.run();
@@ -551,7 +552,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
TestRunner runner = init();
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
- runner.setProperty(PutMongoRecord.UPDATE_MODE,
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+ runner.setProperty(PutMongoRecord.UPDATE_MODE,
UpdateMethod.UPDATE_FF_ATTRIBUTE.getValue());
recordReader.addSchemaField("team", RecordFieldType.STRING);
recordReader.addSchemaField("color", RecordFieldType.STRING);