turcsanyip commented on a change in pull request #5482:
URL: https://github.com/apache/nifi/pull/5482#discussion_r744988569
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -57,28 +70,77 @@
"a configured MongoDB collection. This processor does not support
updates, deletes or upserts. The number of documents to insert at a time is
controlled " +
Review comment:
Could you please update the documentation?
It is not correct any more: _"This processor does not support updates,
deletes or upserts."_
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -190,4 +303,49 @@ private List convertArrays(Object[] input) {
return retVal;
}
+
+ private Bson[] buildFilters(Map<String, List<String>>
updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+ Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+ .stream()
+ .map(updateKeyFieldPath__fieldChain -> {
+ String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+ List<String> fieldChain =
updateKeyFieldPath__fieldChain.getValue();
+
+ Object value = readyToUpsert;
+ String previousField = null;
+ for (String field : fieldChain) {
+ if (!(value instanceof Map)) {
+ throw new ProcessException("field '" + previousField +
"' (from field expression '" + fieldPath + "') is not an embedded document");
+ }
+
+ value = ((Map) value).get(field);
+
+ if (value == null) {
+ throw new ProcessException("field '" + field + "'
(from field expression '" + fieldPath + "') has no value");
+ }
+
+ previousField = field;
+ }
+
+ Bson filter = Filters.eq(fieldPath, value);
+ return filter;
+ })
+ .collect(Collectors.toList())
+ .toArray(new Bson[0]);
+
+ return filters;
+ }
+
+ private boolean updateModeIs(String updateValueToMatch, ProcessContext
context, FlowFile flowFile) {
Review comment:
`updateModeToMatch` ?
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -190,4 +303,49 @@ private List convertArrays(Object[] input) {
return retVal;
}
+
+ private Bson[] buildFilters(Map<String, List<String>>
updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+ Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+ .stream()
+ .map(updateKeyFieldPath__fieldChain -> {
+ String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+ List<String> fieldChain =
updateKeyFieldPath__fieldChain.getValue();
+
+ Object value = readyToUpsert;
+ String previousField = null;
+ for (String field : fieldChain) {
+ if (!(value instanceof Map)) {
+ throw new ProcessException("field '" + previousField +
"' (from field expression '" + fieldPath + "') is not an embedded document");
+ }
+
+ value = ((Map) value).get(field);
+
+ if (value == null) {
+ throw new ProcessException("field '" + field + "'
(from field expression '" + fieldPath + "') has no value");
+ }
+
+ previousField = field;
+ }
+
+ Bson filter = Filters.eq(fieldPath, value);
+ return filter;
+ })
+ .collect(Collectors.toList())
+ .toArray(new Bson[0]);
Review comment:
`.toArray(Bson[]::new)` could be used instead of collect + toArray.
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -135,15 +222,41 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
for (String name : schema.getFieldNames()) {
document.put(name, contentMap.get(name));
}
- inserts.add(convertArrays(document));
- if (inserts.size() == ceiling) {
- collection.insertMany(inserts);
- added += inserts.size();
- inserts = new ArrayList<>();
+ Document readyToUpsert = convertArrays(document);
+
+ WriteModel<Document> writeModel;
+ if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+ Bson[] filters =
buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
+
+ if (updateModeIs(UPDATE_ONE.getValue(), context,
flowFile)) {
+ writeModel = new UpdateOneModel<>(
+ Filters.and(filters),
+ new Document("$set", readyToUpsert),
+ new UpdateOptions().upsert(true)
+ );
+ } else if (updateModeIs(UPDATE_MANY.getValue(), context,
flowFile)) {
+ writeModel = new UpdateManyModel<>(
+ Filters.and(filters),
+ new Document("$set", readyToUpsert),
+ new UpdateOptions().upsert(true)
+ );
+ } else {
+ String flowfileUpdateMode =
flowFile.getAttribute("mongo.update.mode");
+ throw new ProcessException("Unrecognized
'mongo.update.mode' value '" + flowfileUpdateMode + "'");
Review comment:
Unhandled `ProcessException` simply pushes back the FlowFile to the
input queue. It should be sent to `Failure` in this case.
Also in `buildFilters()` method.
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -190,4 +303,49 @@ private List convertArrays(Object[] input) {
return retVal;
}
+
+ private Bson[] buildFilters(Map<String, List<String>>
updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+ Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+ .stream()
+ .map(updateKeyFieldPath__fieldChain -> {
+ String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+ List<String> fieldChain =
updateKeyFieldPath__fieldChain.getValue();
+
+ Object value = readyToUpsert;
+ String previousField = null;
+ for (String field : fieldChain) {
+ if (!(value instanceof Map)) {
+ throw new ProcessException("field '" + previousField +
"' (from field expression '" + fieldPath + "') is not an embedded document");
+ }
+
+ value = ((Map) value).get(field);
+
+ if (value == null) {
+ throw new ProcessException("field '" + field + "'
(from field expression '" + fieldPath + "') has no value");
+ }
+
+ previousField = field;
+ }
+
+ Bson filter = Filters.eq(fieldPath, value);
+ return filter;
+ })
+ .collect(Collectors.toList())
+ .toArray(new Bson[0]);
+
+ return filters;
+ }
+
+ private boolean updateModeIs(String updateValueToMatch, ProcessContext
context, FlowFile flowFile) {
+ String updateMode = context.getProperty(UPDATE_MODE).getValue();
+
+ boolean updateMadeMatches = updateMode.equals(updateValueToMatch)
Review comment:
Typo: `updateModeMatches`
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -118,15 +184,36 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final WriteConcern writeConcern = getWriteConcern(context);
- List<Document> inserts = new ArrayList<>();
int ceiling = context.getProperty(INSERT_COUNT).asInteger();
- int added = 0;
+ int written = 0;
boolean error = false;
- try (final InputStream inStream = session.read(flowFile);
- final RecordReader reader =
recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
- final MongoCollection<Document> collection =
getCollection(context, flowFile).withWriteConcern(writeConcern);
+ boolean ordered = context.getProperty(ORDERED).asBoolean();
+ boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean();
+
+ Map<String, List<String>> updateKeyFieldPathToFieldChain = new
LinkedHashMap<>();
+ if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+
Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*"))
+ .forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put(
+ updateKeyField,
+ Arrays.asList(updateKeyField.split("\\."))
+ ));
+ }
Review comment:
This `updateKeyFieldPathToFieldChain` map is only used in
`buildFilters()`.
The raw property value could be passed in and `fieldChain` could simply be
calculated there which would result in a more readable code:
```
private Bson[] buildFilters(String updateKeyFields, Document
readyToUpsert) {
Bson[] filters = Arrays.stream(updateKeyFields.split("\\s*,\\s*"))
.map(fieldPath -> {
String[] fieldChain = fieldPath.split("\\.");
Object value = readyToUpsert;
...
```
##########
File path:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
##########
@@ -57,28 +70,77 @@
"a configured MongoDB collection. This processor does not support
updates, deletes or upserts. The number of documents to insert at a time is
controlled " +
"by the \"Insert Batch Size\" configuration property. This value
should be set to a reasonable size to ensure " +
"that MongoDB is not overloaded with too many inserts at once.")
+@ReadsAttribute(
+ attribute = "mongodb.update.mode",
+ description = "Configurable parameter for controlling update mode on a
per-flowfile basis." +
+ " Acceptable values are 'one' and 'many' and controls whether a single
incoming record should update a single or multiple Mongo documents."
+)
public class PutMongoRecord extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed
to this relationship").build();
static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are
routed to this relationship").build();
+ static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update
One", "Updates only the first document that matches the query.");
+ static final AllowableValue UPDATE_MANY = new AllowableValue("many",
"Update Many", "Updates every document that matches the query.");
+ static final AllowableValue UPDATE_FF_ATTRIBUTE = new
AllowableValue("flowfile-attribute", "Use 'mongodb.update.mode' flowfile
attribute.",
+ "Use the value of the 'mongodb.update.mode' attribute of the incoming
flowfile. Acceptable values are 'one' and 'many'.");
+
static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing
incoming data and determining the data's schema")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
+
static final PropertyDescriptor INSERT_COUNT = new
PropertyDescriptor.Builder()
.name("insert_count")
- .displayName("Insert Batch Size")
- .description("The number of records to group together for one
single insert operation against MongoDB.")
+ .displayName("Batch Size")
+ .description("The number of records to group together for one
single insert/upsert operation against MongoDB.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+ .name("ordered")
+ .displayName("Ordered")
+ .description("Perform ordered or unordered operations")
+ .allowableValues("True", "False")
+ .defaultValue("False")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor BYPASS_VALIDATION = new
PropertyDescriptor.Builder()
+ .name("bypass-validation")
+ .displayName("Bypass Validation")
+ .description("Bypass schema validation during insert/update")
+ .allowableValues("True", "False")
+ .defaultValue("True")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor UPDATE_KEY_FIELDS = new
PropertyDescriptor.Builder()
+ .name("update-key-fields")
+ .displayName("Update Key Fields")
+ .description("Comma separated list of fields that uniquely
identifies a document. If this property is set NiFi will attempt an upsert
operation on all documents." +
Review comment:
"uniquely" is not correct. `Update Many` mode would not make sense in
that case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]