[ 
https://issues.apache.org/jira/browse/BEAM-6212?focusedWorklogId=177941&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177941
 ]

ASF GitHub Bot logged work on BEAM-6212:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Dec/18 11:58
            Start Date: 21/Dec/18 11:58
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #7256: [BEAM-6212] 
MongoDbIO add ordered option (inserts documents even if errors)
URL: https://github.com/apache/beam/pull/7256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 309a30d582b7..26828125a9e4 100644
--- 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -23,12 +23,14 @@
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.mongodb.BasicDBObject;
+import com.mongodb.MongoBulkWriteException;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -119,6 +121,7 @@ public static Write write() {
         .setSslEnabled(false)
         .setIgnoreSSLCertificate(false)
         .setSslInvalidHostNameAllowed(false)
+        .setOrdered(true)
         .build();
   }
 
@@ -612,6 +615,8 @@ public void close() {
 
     abstract boolean ignoreSSLCertificate();
 
+    abstract boolean ordered();
+
     @Nullable
     abstract String database();
 
@@ -636,6 +641,8 @@ public void close() {
 
       abstract Builder setIgnoreSSLCertificate(boolean value);
 
+      abstract Builder setOrdered(boolean value);
+
       abstract Builder setDatabase(String database);
 
       abstract Builder setCollection(String collection);
@@ -705,6 +712,11 @@ public Write withSSLInvalidHostNameAllowed(boolean 
invalidHostNameAllowed) {
       return 
builder().setSslInvalidHostNameAllowed(invalidHostNameAllowed).build();
     }
 
+    /** ordered bulk records. */
+    public Write withOrdered(boolean ordered) {
+      return builder().setOrdered(ordered).build();
+    }
+
     /** Enable ignoreSSLCertificate for ssl for connection (allow for self 
signed ceritificates). */
     public Write withIgnoreSSLCertificate(boolean ignoreSSLCertificate) {
       return builder().setIgnoreSSLCertificate(ignoreSSLCertificate).build();
@@ -746,6 +758,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       builder.add(DisplayData.item("sslEnable", sslEnabled()));
       builder.add(DisplayData.item("sslInvalidHostNameAllowed", 
sslInvalidHostNameAllowed()));
       builder.add(DisplayData.item("ignoreSSLCertificate", 
ignoreSSLCertificate()));
+      builder.add(DisplayData.item("ordered", ordered()));
       builder.add(DisplayData.item("database", database()));
       builder.add(DisplayData.item("collection", collection()));
       builder.add(DisplayData.item("batchSize", batchSize()));
@@ -799,7 +812,14 @@ private void flush() {
         }
         MongoDatabase mongoDatabase = client.getDatabase(spec.database());
         MongoCollection<Document> mongoCollection = 
mongoDatabase.getCollection(spec.collection());
-        mongoCollection.insertMany(batch);
+        try {
+          mongoCollection.insertMany(batch, new 
InsertManyOptions().ordered(spec.ordered()));
+        } catch (MongoBulkWriteException e) {
+          if (spec.ordered()) {
+            throw e;
+          }
+        }
+
         batch.clear();
       }
 
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 09d8d3a14969..ad6a8612cffc 100644
--- 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -368,4 +368,59 @@ public void testWriteEmptyCollection() throws Exception {
 
     Assert.assertEquals(0, collection.count());
   }
+
+  @Test(expected = Exception.class)
+  public void testWriteDuplicate() throws Exception {
+
+    Document doc =
+        
Document.parse("{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test 
%s\"}");
+    ArrayList<Document> data = new ArrayList<>();
+    data.add(doc);
+    data.add(doc);
+
+    pipeline
+        .apply(Create.of(data))
+        .apply(
+            MongoDbIO.write()
+                .withUri("mongodb://localhost:" + port)
+                .withDatabase("test")
+                .withCollection("test"));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testWriteDuplicateIgnore() throws Exception {
+
+    Document doc =
+        
Document.parse("{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test 
%s\"}");
+    ArrayList<Document> data = new ArrayList<>();
+    data.add(doc);
+    data.add(doc);
+
+    pipeline
+        .apply(Create.of(data))
+        .apply(
+            MongoDbIO.write()
+                .withUri("mongodb://localhost:" + port)
+                .withDatabase("test")
+                .withOrdered(false)
+                .withCollection("test"));
+
+    pipeline.run();
+
+    MongoClient client = new MongoClient("localhost", port);
+    MongoDatabase database = client.getDatabase("test");
+    MongoCollection collection = database.getCollection("test");
+
+    MongoCursor cursor = collection.find().iterator();
+
+    int count = 0;
+    while (cursor.hasNext()) {
+      count = count + 1;
+      cursor.next();
+    }
+
+    assertEquals(1, count);
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 177941)
    Time Spent: 0.5h  (was: 20m)

> MongoDbIO add ordered option (inserts documents even if errors)
> ---------------------------------------------------------------
>
>                 Key: BEAM-6212
>                 URL: https://issues.apache.org/jira/browse/BEAM-6212
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-mongodb
>    Affects Versions: 2.8.0
>            Reporter: Chaim
>            Assignee: Chaim
>            Priority: Major
>             Fix For: 2.10.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> MongoDB supports the ordered option to allow to continue insertions even on 
> failures. By default (if true) it inserts in order and stops if there is a 
> failure.
> https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#basic



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to