This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb56b7  Merge pull request #14927 from [BEAM-12400] MongoDBIO support 
for update within documents
bcb56b7 is described below

commit bcb56b704610e912a74eb0718b68d2a6a0ca0ccf
Author: pareshsarafmdb <[email protected]>
AuthorDate: Mon Sep 6 10:27:47 2021 +0530

    Merge pull request #14927 from [BEAM-12400] MongoDBIO support for update 
within documents
    
    * Did changes for ability to update the existing documents using MongoDBIO 
write
    
    * fixes related to formatting and mongodb update
    
    * Updated javadoc for update enhancement
    
    * address review comments - MongoDBIO update connector
    
    * MongoDBIO - changed UpdateField to be autovalue class
    
    * addressed review comments - javadoc, updatefield refactoring
---
 .../org/apache/beam/sdk/io/mongodb/MongoDbIO.java  | 101 ++++++++++++++++++++-
 .../beam/sdk/io/mongodb/UpdateConfiguration.java   |  82 +++++++++++++++++
 .../apache/beam/sdk/io/mongodb/UpdateField.java    |  71 +++++++++++++++
 .../apache/beam/sdk/io/mongodb/MongoDbIOTest.java  |  39 +++++++-
 4 files changed, 286 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index fa0065e..8c7f03b 100644
--- 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -31,11 +31,17 @@ import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BulkWriteOptions;
 import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import javax.net.ssl.SSLContext;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -93,8 +99,8 @@ import org.slf4j.LoggerFactory;
  *
  * <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.
  *
- * <p>To configure a MongoDB sink, you must specify a connection {@code URI}, 
a {@code Database}
- * name, a {@code Collection} name. For instance:
+ * <p>To configure a MongoDB sink and insert/replace, you must specify a 
connection {@code URI}, a
+ * {@code Database} name, a {@code Collection} name. For instance:
  *
  * <pre>{@code
  * pipeline
@@ -106,6 +112,27 @@ import org.slf4j.LoggerFactory;
  *     .withNumSplits(30))
  *
  * }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection 
{@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in 
target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * *   .apply(...)
+ * *   .apply(MongoDbIO.write()
+ * *     .withUri("mongodb://localhost:27017")
+ * *     .withDatabase("my-database")
+ * *     .withCollection("my-collection")
+ * *     
.withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
+ * *     .withUpdateFields(UpdateField.fieldUpdate("$set", "source-field1", 
"dest-field1"),
+ * *                       UpdateField.fieldUpdate("$set","source-field2", 
"dest-field2"),
+ * *                       //pushes entire input doc to the dest field
+ * *                        UpdateField.fullUpdate("$push", "dest-field3") )));
+ * *
+ * }</pre>
  */
 @Experimental(Kind.SOURCE_SINK)
 @SuppressWarnings({
@@ -490,7 +517,6 @@ public class MongoDbIO {
               .anyMatch(s -> s.keySet().contains("$limit"))) {
             return Collections.singletonList(this);
           }
-
           splitKeys = buildAutoBuckets(mongoDatabase, spec);
 
           for (BsonDocument shardFilter : splitKeysToMatch(splitKeys)) {
@@ -744,6 +770,8 @@ public class MongoDbIO {
 
     abstract long batchSize();
 
+    abstract @Nullable UpdateConfiguration updateConfiguration();
+
     abstract Builder builder();
 
     @AutoValue.Builder
@@ -766,6 +794,8 @@ public class MongoDbIO {
 
       abstract Builder setBatchSize(long batchSize);
 
+      abstract Builder setUpdateConfiguration(UpdateConfiguration 
updateConfiguration);
+
       abstract Write build();
     }
 
@@ -856,6 +886,10 @@ public class MongoDbIO {
       return builder().setBatchSize(batchSize).build();
     }
 
+    public Write withUpdateConfiguration(UpdateConfiguration 
updateConfiguration) {
+      return builder().setUpdateConfiguration(updateConfiguration).build();
+    }
+
     @Override
     public PDone expand(PCollection<Document> input) {
       checkArgument(uri() != null, "withUri() is required");
@@ -910,6 +944,7 @@ public class MongoDbIO {
       public void processElement(ProcessContext ctx) {
         // Need to copy the document because mongoCollection.insertMany() will 
mutate it
         // before inserting (will assign an id).
+
         batch.add(new Document(ctx.element()));
         if (batch.size() >= spec.batchSize()) {
           flush();
@@ -927,6 +962,15 @@ public class MongoDbIO {
         }
         MongoDatabase mongoDatabase = client.getDatabase(spec.database());
         MongoCollection<Document> mongoCollection = 
mongoDatabase.getCollection(spec.collection());
+        if (spec.updateConfiguration() == null) {
+          insertDocuments(mongoCollection);
+        } else {
+          updateDocuments(mongoCollection);
+        }
+        batch.clear();
+      }
+
+      private void insertDocuments(MongoCollection<Document> mongoCollection) {
         try {
           mongoCollection.insertMany(batch, new 
InsertManyOptions().ordered(spec.ordered()));
         } catch (MongoBulkWriteException e) {
@@ -934,8 +978,57 @@ public class MongoDbIO {
             throw e;
           }
         }
+      }
+
+      private void updateDocuments(MongoCollection<Document> mongoCollection) {
+        if (batch.isEmpty()) {
+          return;
+        }
+        List<WriteModel<Document>> actions = new ArrayList<>();
+        @Nullable List<UpdateField> updateFields = 
spec.updateConfiguration().updateFields();
+        Map<String, List<UpdateField>> operatorFieldsMap = 
getOperatorFieldsMap(updateFields);
+        try {
+          for (Document doc : batch) {
+            Document updateDocument = new Document();
+            for (Map.Entry<String, List<UpdateField>> entry : 
operatorFieldsMap.entrySet()) {
+              Document updateSubDocument = new Document();
+              for (UpdateField field : entry.getValue()) {
+                updateSubDocument.append(
+                    field.destField(),
+                    field.sourceField() == null ? doc : 
doc.get(field.sourceField()));
+              }
+              updateDocument.append(entry.getKey(), updateSubDocument);
+            }
+            Document findCriteria =
+                new Document("_id", 
doc.get(spec.updateConfiguration().updateKey()));
+            UpdateOptions updateOptions =
+                new 
UpdateOptions().upsert(spec.updateConfiguration().isUpsert());
+            actions.add(new UpdateOneModel<>(findCriteria, updateDocument, 
updateOptions));
+          }
+          mongoCollection.bulkWrite(actions, new 
BulkWriteOptions().ordered(spec.ordered()));
+        } catch (MongoBulkWriteException e) {
+          if (spec.ordered()) {
+            throw e;
+          }
+        }
+      }
 
-        batch.clear();
+      private static Map<String, List<UpdateField>> getOperatorFieldsMap(
+          List<UpdateField> updateFields) {
+        Map<String, List<UpdateField>> operatorFieldsMap = new HashMap<>();
+        for (UpdateField field : updateFields) {
+          String updateOperator = field.updateOperator();
+          if (operatorFieldsMap.containsKey(updateOperator)) {
+            List<UpdateField> fields = operatorFieldsMap.get(updateOperator);
+            fields.add(field);
+            operatorFieldsMap.put(updateOperator, fields);
+          } else {
+            List<UpdateField> fields = new ArrayList<>();
+            fields.add(field);
+            operatorFieldsMap.put(updateOperator, fields);
+          }
+        }
+        return operatorFieldsMap;
       }
 
       @Teardown
diff --git 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
new file mode 100644
index 0000000..cda037d
--- /dev/null
+++ 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Builds a MongoDB UpdateConfiguration object. */
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UpdateConfiguration implements Serializable {
+
+  abstract @Nullable String updateKey();
+
+  abstract @Nullable List<UpdateField> updateFields();
+
+  abstract boolean isUpsert();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateConfiguration.Builder()
+        .setUpdateFields(Collections.emptyList())
+        .setIsUpsert(false);
+  }
+
+  abstract Builder toBuilder();
+
+  public static UpdateConfiguration create() {
+    return builder().build();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setUpdateFields(@Nullable List<UpdateField> updateFields);
+
+    abstract Builder setUpdateKey(@Nullable String updateKey);
+
+    abstract Builder setIsUpsert(boolean isUpsert);
+
+    abstract UpdateConfiguration build();
+  }
+
+  /**
+   * Sets the configurations for multiple updates. Takes update operator, 
source field name and dest
+   * field name for each one
+   */
+  public UpdateConfiguration withUpdateFields(UpdateField... updateFields) {
+    return toBuilder().setUpdateFields(Arrays.asList(updateFields)).build();
+  }
+
+  /** Sets the filters to find. */
+  public UpdateConfiguration withUpdateKey(String updateKey) {
+    return toBuilder().setUpdateKey(updateKey).build();
+  }
+
+  public UpdateConfiguration withIsUpsert(boolean isUpsert) {
+    return toBuilder().setIsUpsert(isUpsert).build();
+  }
+}
diff --git 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
new file mode 100644
index 0000000..a1c8423
--- /dev/null
+++ 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+  abstract @Nullable String updateOperator();
+
+  abstract @Nullable String sourceField();
+
+  abstract @Nullable String destField();
+
+  private static Builder builder() {
+    return new AutoValue_UpdateField.Builder().setSourceField(null);
+  }
+
+  abstract UpdateField.Builder toBuilder();
+
+  private static UpdateField create() {
+    return builder().build();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract UpdateField.Builder setUpdateOperator(@Nullable String 
updateOperator);
+
+    abstract UpdateField.Builder setSourceField(@Nullable String sourceField);
+
+    abstract UpdateField.Builder setDestField(@Nullable String destField);
+
+    abstract UpdateField build();
+  }
+
+  /** Sets the limit of documents to find. */
+  public static UpdateField fullUpdate(String updateOperator, String 
destField) {
+    return 
create().toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
+  }
+
+  public static UpdateField fieldUpdate(
+      String updateOperator, String sourceField, String destField) {
+    return create()
+        .toBuilder()
+        .setUpdateOperator(updateOperator)
+        .setSourceField(sourceField)
+        .setDestField(destField)
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 89af792..3e2b62c 100644
--- 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -103,7 +103,7 @@ public class MongoDbIOTest {
     client = new MongoClient("localhost", port);
 
     LOG.info("Insert test data");
-    List<Document> documents = createDocuments(1000);
+    List<Document> documents = createDocuments(1000, false);
     MongoCollection<Document> collection = getCollection(COLLECTION);
     collection.insertMany(documents);
   }
@@ -329,7 +329,7 @@ public class MongoDbIOTest {
     final int numElements = 1000;
 
     pipeline
-        .apply(Create.of(createDocuments(numElements)))
+        .apply(Create.of(createDocuments(numElements, false)))
         .apply(
             MongoDbIO.write()
                 .withUri("mongodb://localhost:" + port)
@@ -361,7 +361,37 @@ public class MongoDbIOTest {
     assertEquals(1, countElements(collectionName));
   }
 
-  private static List<Document> createDocuments(final int n) {
+  @Test
+  public void testUpdate() {
+    final String collectionName = "testUpdate";
+    final int numElements = 100;
+    Document doc = 
Document.parse("{\"id\":1,\"scientist\":\"Updated\",\"country\":\"India\"}");
+
+    getCollection(collectionName).insertMany(createDocuments(numElements, 
true));
+    assertEquals(numElements, countElements(collectionName));
+    List<Document> docs = new ArrayList<>();
+    docs.add(doc);
+    pipeline
+        .apply(Create.of(docs))
+        .apply(
+            MongoDbIO.write()
+                .withUri("mongodb://localhost:" + port)
+                .withDatabase(DATABASE)
+                .withCollection(collectionName)
+                .withUpdateConfiguration(
+                    UpdateConfiguration.create()
+                        .withUpdateKey("id")
+                        .withUpdateFields(
+                            UpdateField.fieldUpdate("$set", "scientist", 
"scientist"),
+                            UpdateField.fieldUpdate("$set", "country", 
"country"))));
+    pipeline.run();
+
+    Document out = getCollection(collectionName).find(new Document("_id", 
1)).first();
+    assertEquals("Updated", out.get("scientist"));
+    assertEquals("India", out.get("country"));
+  }
+
+  private static List<Document> createDocuments(final int n, boolean addId) {
     final String[] scientists =
         new String[] {
           "Einstein",
@@ -392,6 +422,9 @@ public class MongoDbIOTest {
     for (int i = 1; i <= n; i++) {
       int index = i % scientists.length;
       Document document = new Document();
+      if (addId) {
+        document.append("_id", i);
+      }
       document.append("scientist", scientists[index]);
       document.append("country", country[index]);
       documents.add(document);

Reply via email to