dan-s1 commented on code in PR #8610:
URL: https://github.com/apache/nifi/pull/8610#discussion_r1572451044
##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content,
true));
// parse
- final Object doc = (mode.equals(MODE_INSERT) ||
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+ final Object doc = (processorMode.equals(MODE_INSERT) ||
(processorMode.equals(MODE_UPDATE) &&
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) :
BasicDBObject.parse(new String(content, charset));
- if (MODE_INSERT.equalsIgnoreCase(mode)) {
+ if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
collection.insertOne((Document)doc);
logger.info("inserted {} into MongoDB", new Object[] {
flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey =
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery =
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
- final Document query;
+ final Document updateQuery;
- if (!StringUtils.isBlank(updateKey)) {
- query = parseUpdateKey(updateKey, (Map)doc);
+ if (StringUtils.isNotBlank(updateKey)) {
+ updateQuery = parseUpdateKey(updateKey, (Map)doc);
removeUpdateKeys(updateKey, (Map)doc);
} else {
- query = Document.parse(filterQuery);
+ updateQuery = Document.parse(filterQuery);
}
-
- if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
- collection.replaceOne(query, (Document)doc, new
ReplaceOptions().upsert(upsert));
+ UpdateResult updateResult;
+ if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
+ updateResult = collection.replaceOne(updateQuery,
(Document)doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove(updateKey);
- collection.updateOne(query, update, new
UpdateOptions().upsert(upsert));
+ UpdateOptions updateOptions = new
UpdateOptions().upsert(upsert);
+ PropertyValue updateQueryMode =
context.getProperty(MONGO_UPDATE_MODE);
+
+ if
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateOne(updateQuery,
update, updateOptions);
+ } else if
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateMany(updateQuery,
update,updateOptions);
+ } else {
+ String flowfileUpdateMode =
flowFile.getAttribute(PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE);
+ throw new ProcessException("Unrecognized '" +
PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode
+ "'");
+ }
+ }
+ flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
Review Comment:
```suggestion
}
flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
```
##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content,
true));
// parse
- final Object doc = (mode.equals(MODE_INSERT) ||
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+ final Object doc = (processorMode.equals(MODE_INSERT) ||
(processorMode.equals(MODE_UPDATE) &&
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) :
BasicDBObject.parse(new String(content, charset));
- if (MODE_INSERT.equalsIgnoreCase(mode)) {
+ if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
collection.insertOne((Document)doc);
logger.info("inserted {} into MongoDB", new Object[] {
flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey =
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery =
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
- final Document query;
+ final Document updateQuery;
- if (!StringUtils.isBlank(updateKey)) {
- query = parseUpdateKey(updateKey, (Map)doc);
+ if (StringUtils.isNotBlank(updateKey)) {
+ updateQuery = parseUpdateKey(updateKey, (Map)doc);
removeUpdateKeys(updateKey, (Map)doc);
} else {
- query = Document.parse(filterQuery);
+ updateQuery = Document.parse(filterQuery);
}
-
- if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
- collection.replaceOne(query, (Document)doc, new
ReplaceOptions().upsert(upsert));
+ UpdateResult updateResult;
+ if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
+ updateResult = collection.replaceOne(updateQuery,
(Document)doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove(updateKey);
- collection.updateOne(query, update, new
UpdateOptions().upsert(upsert));
+ UpdateOptions updateOptions = new
UpdateOptions().upsert(upsert);
+ PropertyValue updateQueryMode =
context.getProperty(MONGO_UPDATE_MODE);
+
+ if
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateOne(updateQuery,
update, updateOptions);
+ } else if
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateMany(updateQuery,
update,updateOptions);
+ } else {
+ String flowfileUpdateMode =
flowFile.getAttribute(PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE);
+ throw new ProcessException("Unrecognized '" +
PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode
+ "'");
+ }
+ }
+ flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
+ flowFile = session.putAttribute(flowFile,
ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
+ BsonValue upsertedId = updateResult.getUpsertedId();
+ if (upsertedId != null) {
+ String id = upsertedId.isString()?
upsertedId.asString().getValue(): upsertedId.asObjectId().getValue().toString();
Review Comment:
```suggestion
String id = upsertedId.isString() ?
upsertedId.asString().getValue() :
upsertedId.asObjectId().getValue().toString();
```
##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content,
true));
// parse
- final Object doc = (mode.equals(MODE_INSERT) ||
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+ final Object doc = (processorMode.equals(MODE_INSERT) ||
(processorMode.equals(MODE_UPDATE) &&
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) :
BasicDBObject.parse(new String(content, charset));
- if (MODE_INSERT.equalsIgnoreCase(mode)) {
+ if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
Review Comment:
Pick either `.equals` or `.equalsIgnoreCase` for both lines. I am leaning
though towards `.equals`.
##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content,
true));
// parse
- final Object doc = (mode.equals(MODE_INSERT) ||
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+ final Object doc = (processorMode.equals(MODE_INSERT) ||
(processorMode.equals(MODE_UPDATE) &&
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) :
BasicDBObject.parse(new String(content, charset));
- if (MODE_INSERT.equalsIgnoreCase(mode)) {
+ if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
collection.insertOne((Document)doc);
logger.info("inserted {} into MongoDB", new Object[] {
flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey =
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery =
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
- final Document query;
+ final Document updateQuery;
- if (!StringUtils.isBlank(updateKey)) {
- query = parseUpdateKey(updateKey, (Map)doc);
+ if (StringUtils.isNotBlank(updateKey)) {
+ updateQuery = parseUpdateKey(updateKey, (Map)doc);
removeUpdateKeys(updateKey, (Map)doc);
} else {
- query = Document.parse(filterQuery);
+ updateQuery = Document.parse(filterQuery);
}
-
- if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
- collection.replaceOne(query, (Document)doc, new
ReplaceOptions().upsert(upsert));
+ UpdateResult updateResult;
+ if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
+ updateResult = collection.replaceOne(updateQuery,
(Document)doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove(updateKey);
- collection.updateOne(query, update, new
UpdateOptions().upsert(upsert));
+ UpdateOptions updateOptions = new
UpdateOptions().upsert(upsert);
+ PropertyValue updateQueryMode =
context.getProperty(MONGO_UPDATE_MODE);
+
+ if
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateOne(updateQuery,
update, updateOptions);
+ } else if
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY,
updateQueryMode, flowFile)) {
+ updateResult = collection.updateMany(updateQuery,
update,updateOptions);
Review Comment:
```suggestion
updateResult = collection.updateMany(updateQuery,
update, updateOptions);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]