pareshsarafmdb commented on a change in pull request #14927:
URL: https://github.com/apache/beam/pull/14927#discussion_r654112756
##########
File path:
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
* .withNumSplits(30))
*
* }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection
{@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in
target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * * .apply(...)
+ * * .apply(MongoDbIO.write()
+ * * .withUri("mongodb://localhost:27017")
+ * * .withDatabase("my-database")
+ * * .withCollection("my-collection")
+ * * .withIsUpdate(true)
+ * * .withUpdateKey("key-to-match")
+ * * .withUpdateField("field-to-update")
+ * * .withUpdateOperator("$set")
+ * * .withNumSplits(30))
Review comment:
Thanks for the clarification! This makes sense. But in real time Sync
(CDC) scenario mostly it will be rows coming in and being added to an array or
set within mongoDB. For eg say we have a customer collection. New transactions
made by the customer is captured and it should be added to the transactions
array within customer document. So we definitely need an option to update with
full document. So I think we can add both of these options - updating all the
fields together and one by one as well. That will be more feature rich as well.
What say ?
##########
File path:
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -766,6 +800,14 @@ private MongoClient createClient(Read spec) {
abstract Builder setBatchSize(long batchSize);
+ abstract Builder setIsUpdate(boolean isUpdate);
Review comment:
Thank you for the feedback. This should make it easy and clean. I will
work on it.
##########
File path:
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
##########
@@ -106,6 +109,28 @@
* .withNumSplits(30))
*
* }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection
{@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in
target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * * .apply(...)
+ * * .apply(MongoDbIO.write()
+ * * .withUri("mongodb://localhost:27017")
+ * * .withDatabase("my-database")
+ * * .withCollection("my-collection")
+ * * .withIsUpdate(true)
+ * * .withUpdateKey("key-to-match")
+ * * .withUpdateField("field-to-update")
+ * * .withUpdateOperator("$set")
+ * * .withNumSplits(30))
Review comment:
and other challenge - right now in your example we are assuming that the
field/column name in the source is same as field name to be updated in the
target. It can be different in most of the cases. So it might be like below:
.withUpdateKey("name")
.withUpdateFields(
UpdateField.of("$set", "status",
"dest-status-field-in-mdb"),
UpdateField.of("$currentDate", "lastUpdated",
"dest-lastUpdated-field-in-mdb"),
UpdateField.of("$set", "age", "dest-age-field-in-mdb")))
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]