pabloem commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r652150217
##########
File path:
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -856,6 +898,22 @@ public Write withBatchSize(long batchSize) {
return builder().setBatchSize(batchSize).build();
}
+ public Write withIsUpdate(boolean isUpdate) {
+ return builder().setIsUpdate(isUpdate).build();
+ }
+
+ public Write withUpdateKey(String updateKey) {
+ return builder().setUpdateKey(updateKey).build();
+ }
+
+ public Write withUpdateOperator(String updateOperator) {
+ return builder().setUpdateOperator(updateOperator).build();
+ }
+
+ public Write withUpdateField(String updateField) {
+ return builder().setUpdateField(updateField).build();
+ }
+
Review comment:
Please also add javadoc to each public method
##########
File path:
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -877,6 +935,11 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.add(DisplayData.item("database", database()));
builder.add(DisplayData.item("collection", collection()));
builder.add(DisplayData.item("batchSize", batchSize()));
+ // builder.add(DisplayData.item("isUpdate", isUpdate()));
+ // builder.add(DisplayData.item("updateKey", updateKey()));
+ // builder.add(DisplayData.item("updateOperator", updateOperator()));
+ // builder.add(Data.item("updateOptions", updateOptions()));
+ // builder.add(DisplayData.item("updateField", updateField()));
Review comment:
Uncoment these? Maybe check if they're not null and uncomment I suppose.
##########
File path:
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
* .withNumSplits(30))
*
* }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection
{@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in
target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * * .apply(...)
+ * * .apply(MongoDbIO.write()
+ * * .withUri("mongodb://localhost:27017")
+ * * .withDatabase("my-database")
+ * * .withCollection("my-collection")
+ * * .withIsUpdate(true)
+ * * .withUpdateKey("key-to-match")
+ * * .withUpdateField("field-to-update")
+ * * .withUpdateOperator("$set")
+ * * .withNumSplits(30))
Review comment:
In this case, only one field can be updated. Is that correct? would it
make sense to support updating more than one field?
I see that mongodb works with pairs of `updateOperator`+`updateField`. I
wonder if it makes sense to support something like that instead?
e.g.:
```
pipeline
.apply(...)
.apply(MongoDbIO.write()
.withUri("mongodb://localhost:27017")
.withDatabase("my-database")
.withCollection("my-collection")
.withIsUpdate(true)
.withUpdateKey("key-to-match")
.withUpdateFields(
"$set", "field1",
"$currentDate", "datefield1",
"$min", "special-minimum-field1")
.withNumSplits(30))
```
or perhaps something like this:
```
.withUpdateFields(
UpdateField.of("$set", "field1"),
UpdateField.of("$currentDate", "datefield1"),
UpdateField.of("$min", "special-minimum-field1")))
```
I also think it may be helpful to support _upsert_ operations. WDYT?
I don't want to make this too complex - but it feels like these options
could be supported easily, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]