This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new e0050b4  [FLINK-37950] Supporting ordered writes & 
`bypassDocumentValidation` behavior for sink writer
e0050b4 is described below

commit e0050b4a0f9bc6b079a0cbe891339b9e9f612b21
Author: Rahul Teke <[email protected]>
AuthorDate: Tue Feb 10 05:16:17 2026 +0000

    [FLINK-37950] Supporting ordered writes & `bypassDocumentValidation` 
behavior for sink writer
---
 docs/content/docs/connectors/datastream/mongodb.md |   8 +-
 docs/content/docs/connectors/table/mongodb.md      |  18 +++-
 .../connector/mongodb/sink/MongoSinkBuilder.java   |  22 ++++
 .../mongodb/sink/config/MongoWriteOptions.java     |  63 +++++++++++-
 .../connector/mongodb/sink/writer/MongoWriter.java |   8 +-
 .../mongodb/table/MongoConnectorOptions.java       |  12 +++
 .../connector/mongodb/sink/MongoSinkITCase.java    | 114 +++++++++++++++++++--
 .../mongodb/sink/writer/MongoWriterITCase.java     |   8 +-
 .../connector/mongodb/testutils/MongoTestUtil.java |  28 ++++-
 9 files changed, 263 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/mongodb.md 
b/docs/content/docs/connectors/datastream/mongodb.md
index b59d605..3e195ab 100644
--- a/docs/content/docs/connectors/datastream/mongodb.md
+++ b/docs/content/docs/connectors/datastream/mongodb.md
@@ -204,7 +204,13 @@ Flink's MongoDB sink is created by using the static 
builder `MongoSink.<InputTyp
 7. _setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee)_
     * Optional. Default: `DeliveryGuarantee.AT_LEAST_ONCE`.
     * Sets the wanted `DeliveryGuarantee`. The `EXACTLY_ONCE` guarantee is not 
supported yet.
-8. __setSerializationSchema(MongoSerializationSchema<InputType> 
serializationSchema)__
+8. _setOrderedWrites(boolean ordered)_
+    * Optional. Default: `true`
+    * Defines MongoDB driver option to perform ordered writes.
+8. _setBypassDocumentValidation(boolean bypassDocumentValidation)_
+    * Optional. Default: `false`
+    * Defines MongoDB driver option to bypass document validation. 
+9. __setSerializationSchema(MongoSerializationSchema<InputType> 
serializationSchema)__
     * Required.
     * A `MongoSerializationSchema` is required for parsing input record to 
MongoDB 
       
[WriteModel](https://www.mongodb.com/docs/drivers/java/sync/current/usage-examples/bulkWrite/).
diff --git a/docs/content/docs/connectors/table/mongodb.md 
b/docs/content/docs/connectors/table/mongodb.md
index 29b90d9..0d4f065 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -297,7 +297,23 @@ Connector Options
       <td style="word-wrap: break-word;">at-lease-once</td>
       <td><p>Enum</p>Possible values: none, at-least-once</td>
       <td>Optional delivery guarantee when committing. The exactly-once 
guarantee is not supported yet.</td>
-    </tr> 
+    </tr>
+    <tr>
+      <td><h5>sink.ordered-writes</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Defines MongoDB driver option to perform ordered writes. By default, 
this is true indicating ordered writes.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bypass-document-validation</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Defines MongoDB driver option to bypass document validation. By 
default, this is false indicating validation of documents.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
index 49aa35b..7199b2a 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/MongoSinkBuilder.java
@@ -126,6 +126,28 @@ public class MongoSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Set the ordered write {@link com.mongodb.client.model.BulkWriteOptions}.
+     *
+     * @param ordered describes the write behaviour
+     * @return this builder
+     */
+    public MongoSinkBuilder<IN> setOrderedWrites(boolean ordered) {
+        writeOptionsBuilder.setOrderedWrites(ordered);
+        return this;
+    }
+
+    /**
+     * Set the bypass document validation {@link 
com.mongodb.client.model.BulkWriteOptions}.
+     *
+     * @param bypassDocumentValidation describes document validation behaviour
+     * @return this builder
+     */
+    public MongoSinkBuilder<IN> setBypassDocumentValidation(boolean 
bypassDocumentValidation) {
+        
writeOptionsBuilder.setBypassDocumentValidation(bypassDocumentValidation);
+        return this;
+    }
+
     /**
      * Sets the serialization schema which is invoked on every record to 
convert it to MongoDB bulk
      * request.
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
index 15f4293..576d199 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/config/MongoWriteOptions.java
@@ -26,7 +26,9 @@ import java.util.Objects;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_BYPASS_VALIDATION;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_MAX_RETRIES;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_ORDERED_WRITES;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SINK_RETRY_INTERVAL;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,6 +42,8 @@ public final class MongoWriteOptions implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private final boolean orderedWrites;
+    private final boolean bypassDocumentValidation;
     private final int batchSize;
     private final long batchIntervalMs;
     private final int maxRetries;
@@ -47,11 +51,15 @@ public final class MongoWriteOptions implements 
Serializable {
     private final DeliveryGuarantee deliveryGuarantee;
 
     private MongoWriteOptions(
+            boolean orderedWrites,
+            boolean bypassDocumentValidation,
             int batchSize,
             long batchIntervalMs,
             int maxRetries,
             long retryIntervalMs,
             DeliveryGuarantee deliveryGuarantee) {
+        this.orderedWrites = orderedWrites;
+        this.bypassDocumentValidation = bypassDocumentValidation;
         this.batchSize = batchSize;
         this.batchIntervalMs = batchIntervalMs;
         this.maxRetries = maxRetries;
@@ -59,6 +67,14 @@ public final class MongoWriteOptions implements Serializable 
{
         this.deliveryGuarantee = deliveryGuarantee;
     }
 
+    public boolean isOrderedWrites() {
+        return orderedWrites;
+    }
+
+    public boolean isBypassDocumentValidation() {
+        return bypassDocumentValidation;
+    }
+
     public int getBatchSize() {
         return batchSize;
     }
@@ -88,7 +104,9 @@ public final class MongoWriteOptions implements Serializable 
{
             return false;
         }
         MongoWriteOptions that = (MongoWriteOptions) o;
-        return batchSize == that.batchSize
+        return orderedWrites == that.orderedWrites
+                && bypassDocumentValidation == that.bypassDocumentValidation
+                && batchSize == that.batchSize
                 && batchIntervalMs == that.batchIntervalMs
                 && maxRetries == that.maxRetries
                 && retryIntervalMs == that.retryIntervalMs
@@ -98,7 +116,13 @@ public final class MongoWriteOptions implements 
Serializable {
     @Override
     public int hashCode() {
         return Objects.hash(
-                batchSize, batchIntervalMs, maxRetries, retryIntervalMs, 
deliveryGuarantee);
+                orderedWrites,
+                bypassDocumentValidation,
+                batchSize,
+                batchIntervalMs,
+                maxRetries,
+                retryIntervalMs,
+                deliveryGuarantee);
     }
 
     public static MongoWriteOptionsBuilder builder() {
@@ -108,6 +132,8 @@ public final class MongoWriteOptions implements 
Serializable {
     /** Builder for {@link MongoWriteOptions}. */
     @PublicEvolving
     public static class MongoWriteOptionsBuilder {
+        private boolean orderedWrites = SINK_ORDERED_WRITES.defaultValue();
+        private boolean bypassDocumentValidation = 
SINK_BYPASS_VALIDATION.defaultValue();
         private int batchSize = BUFFER_FLUSH_MAX_ROWS.defaultValue();
         private long batchIntervalMs = 
BUFFER_FLUSH_INTERVAL.defaultValue().toMillis();
         private int maxRetries = SINK_MAX_RETRIES.defaultValue();
@@ -116,6 +142,31 @@ public final class MongoWriteOptions implements 
Serializable {
 
         private MongoWriteOptionsBuilder() {}
 
+        /**
+         * Sets the mongodb bulk write option ordered {@link
+         * com.mongodb.client.model.BulkWriteOptions}.
+         *
+         * @param orderedWrites bulk write option
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setOrderedWrites(boolean 
orderedWrites) {
+            this.orderedWrites = orderedWrites;
+            return this;
+        }
+
+        /**
+         * Sets the mongodb bulk write option bypassDocumentValidation {@link
+         * com.mongodb.client.model.BulkWriteOptions}.
+         *
+         * @param bypassDocumentValidation bulk write option to bypass 
document validation
+         * @return this builder
+         */
+        public MongoWriteOptionsBuilder setBypassDocumentValidation(
+                boolean bypassDocumentValidation) {
+            this.bypassDocumentValidation = bypassDocumentValidation;
+            return this;
+        }
+
         /**
          * Sets the maximum number of actions to buffer for each batch 
request. You can pass -1 to
          * disable batching.
@@ -195,7 +246,13 @@ public final class MongoWriteOptions implements 
Serializable {
          */
         public MongoWriteOptions build() {
             return new MongoWriteOptions(
-                    batchSize, batchIntervalMs, maxRetries, retryIntervalMs, 
deliveryGuarantee);
+                    orderedWrites,
+                    bypassDocumentValidation,
+                    batchSize,
+                    batchIntervalMs,
+                    maxRetries,
+                    retryIntervalMs,
+                    deliveryGuarantee);
         }
     }
 }
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
index cbd9a61..b806f9e 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import com.mongodb.MongoException;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
+import com.mongodb.client.model.BulkWriteOptions;
 import com.mongodb.client.model.WriteModel;
 import org.bson.BsonDocument;
 import org.slf4j.Logger;
@@ -213,7 +214,12 @@ public class MongoWriter<IN> implements SinkWriter<IN> {
                 mongoClient
                         .getDatabase(connectionOptions.getDatabase())
                         .getCollection(connectionOptions.getCollection(), 
BsonDocument.class)
-                        .bulkWrite(bulkRequests);
+                        .bulkWrite(
+                                bulkRequests,
+                                new BulkWriteOptions()
+                                        
.ordered(writeOptions.isOrderedWrites())
+                                        .bypassDocumentValidation(
+                                                
writeOptions.isBypassDocumentValidation()));
                 ackTime = System.currentTimeMillis();
                 bulkRequests.clear();
                 break;
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
index 36948f2..b03e70c 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
@@ -115,6 +115,18 @@ public class MongoConnectorOptions {
                     .withDescription(
                             "Specifies the retry time interval if lookup 
records from database failed.");
 
+    public static final ConfigOption<Boolean> SINK_ORDERED_WRITES =
+            ConfigOptions.key("sink.ordered-writes")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Specifies mongodb bulk write ordered 
option");
+
+    public static final ConfigOption<Boolean> SINK_BYPASS_VALIDATION =
+            ConfigOptions.key("sink.bypass-document-validation")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Specifies mongodb bulk write option to 
bypass validation");
+
     public static final ConfigOption<Integer> BUFFER_FLUSH_MAX_ROWS =
             ConfigOptions.key("sink.buffer-flush.max-rows")
                     .intType()
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
index d7b4a07..0e1c274 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
 import 
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
 import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -34,12 +35,18 @@ import org.apache.flink.testutils.junit.SharedReference;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.UpdateOneModel;
 import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.ValidationAction;
+import com.mongodb.client.model.ValidationOptions;
 import com.mongodb.client.model.WriteModel;
 import org.bson.BsonDocument;
+import org.bson.BsonType;
 import org.bson.Document;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -54,7 +61,9 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
+import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenInAnyOrder;
+import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenInOrder;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link MongoSink}. */
@@ -93,6 +102,93 @@ class MongoSinkITCase {
         }
     }
 
+    @Test
+    void testOrderedWrite() throws Exception {
+        final String collection = "test-sink-with-ordered-write";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, 
false);
+        Configuration config = new Configuration();
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.enableCheckpointing(100L);
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWrittenInOrder(collectionOf(collection), 1, 2, 3, 4, 
5);
+    }
+
+    @Test
+    void testUnorderedWrite() throws Exception {
+        final String collection = "test-sink-with-unordered-write";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, 
false);
+        Configuration config = new Configuration();
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.enableCheckpointing(100L);
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 3, 
4, 5);
+    }
+
+    @Test
+    void testDocumentValidation() throws Exception {
+        final String collection = "test-sink-with-doc-validation";
+        // Create collection with validation
+        ValidationOptions validationOptions =
+                new ValidationOptions().validator(Filters.type("_id", 
BsonType.INT64));
+        validationOptions.validationAction(ValidationAction.ERROR);
+        CreateCollectionOptions createCollectionOptions = new 
CreateCollectionOptions();
+        createCollectionOptions.validationOptions(validationOptions);
+        mongoClient
+                .getDatabase(TEST_DATABASE)
+                .createCollection(collection, createCollectionOptions);
+
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, 
false);
+        Configuration config = new Configuration();
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.enableCheckpointing(100L);
+
+        String[] data = new String[] {"1", "2", "3", "4", "5", "A"};
+        env.fromData(data).map(id -> new Document("_id", id).append("f1", "d_" 
+ id)).sinkTo(sink);
+        Assertions.assertThrows(JobExecutionException.class, env::execute);
+
+        assertThatIdsAreNotWritten(collectionOf(collection), data);
+    }
+
+    @Test
+    void testBypassDocumentValidation() throws Exception {
+        final String collection = "test-sink-with-bypass-doc-validation";
+        // Create collection with validation
+        ValidationOptions validationOptions =
+                new ValidationOptions().validator(Filters.type("_id", 
BsonType.INT64));
+        validationOptions.validationAction(ValidationAction.ERROR);
+        CreateCollectionOptions createCollectionOptions = new 
CreateCollectionOptions();
+        createCollectionOptions.validationOptions(validationOptions);
+        mongoClient
+                .getDatabase(TEST_DATABASE)
+                .createCollection(collection, createCollectionOptions);
+
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, 
true);
+        Configuration config = new Configuration();
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.enableCheckpointing(100L);
+
+        String[] data = new String[] {"1", "2", "3", "4", "5", "A"};
+        env.fromData(data).map(id -> new Document("_id", id).append("f1", "d_" 
+ id)).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), data);
+    }
+
     @ParameterizedTest
     @EnumSource(
             value = DeliveryGuarantee.class,
@@ -101,7 +197,7 @@ class MongoSinkITCase {
     void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee)
             throws Exception {
         final String collection = "test-sink-with-delivery-" + 
deliveryGuarantee;
-        final MongoSink<Document> sink = createSink(collection, 
deliveryGuarantee);
+        final MongoSink<Document> sink = createSink(collection, 
deliveryGuarantee, true, false);
         Configuration config = new Configuration();
         config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
         final StreamExecutionEnvironment env =
@@ -110,13 +206,14 @@ class MongoSinkITCase {
 
         env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
         env.execute();
-        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+        assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 3, 
4, 5);
     }
 
     @Test
     void testRecovery() throws Exception {
         final String collection = "test-recovery-mongo-sink";
-        final MongoSink<Document> sink = createSink(collection, 
DeliveryGuarantee.AT_LEAST_ONCE);
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, 
false);
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(100L);
@@ -129,18 +226,23 @@ class MongoSinkITCase {
                 .sinkTo(sink);
 
         env.execute();
-        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+        assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 3, 
4, 5);
         assertThat(failed.get()).isTrue();
     }
 
     private static MongoSink<Document> createSink(
-            String collection, DeliveryGuarantee deliveryGuarantee) {
+            String collection,
+            DeliveryGuarantee deliveryGuarantee,
+            boolean ordered,
+            boolean bypassDocumentValidation) {
         return MongoSink.<Document>builder()
                 .setUri(MONGO_CONTAINER.getConnectionString())
                 .setDatabase(TEST_DATABASE)
                 .setCollection(collection)
                 .setBatchSize(5)
                 .setDeliveryGuarantee(deliveryGuarantee)
+                .setOrderedWrites(ordered)
+                .setBypassDocumentValidation(bypassDocumentValidation)
                 .setSerializationSchema(new UpsertSerializationSchema())
                 .build();
     }
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index 2c5ba84..13cff69 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -55,7 +55,7 @@ import java.io.IOException;
 import java.util.Optional;
 
 import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
-import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
+import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenInAnyOrder;
 import static 
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
@@ -120,14 +120,14 @@ class MongoWriterITCase {
 
             // Trigger flush
             writer.write(buildMessage(5), null);
-            assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+            assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 
3, 4, 5);
 
             writer.write(buildMessage(6), null);
             assertThatIdsAreNotWritten(collectionOf(collection), 6);
 
             // Force flush
             writer.doBulkWrite();
-            assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5, 
6);
+            assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 
3, 4, 5, 6);
         }
     }
 
@@ -169,7 +169,7 @@ class MongoWriterITCase {
             // Trigger flush
             writer.flush(false);
 
-            assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3);
+            assertThatIdsAreWrittenInAnyOrder(collectionOf(collection), 1, 2, 
3);
         }
     }
 
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
index d6c5bf4..ea0ed54 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java
@@ -87,13 +87,37 @@ public class MongoTestUtil {
         assertThat(idsAreWritten).isEmpty();
     }
 
-    public static void assertThatIdsAreWritten(MongoCollection<Document> coll, 
Integer... ids) {
+    public static void assertThatIdsAreNotWritten(MongoCollection<Document> 
coll, String... ids) {
+        List<String> idsAreWritten = new ArrayList<>();
+        coll.find(Filters.in("_id", ids)).map(d -> 
d.getString("_id")).into(idsAreWritten);
+
+        assertThat(idsAreWritten).isEmpty();
+    }
+
+    public static void assertThatIdsAreWrittenInAnyOrder(
+            MongoCollection<Document> coll, Integer... ids) {
         List<Integer> actualIds = new ArrayList<>();
         coll.find(Filters.in("_id", ids)).map(d -> 
d.getInteger("_id")).into(actualIds);
 
         assertThat(actualIds).containsExactlyInAnyOrder(ids);
     }
 
+    public static void assertThatIdsAreWrittenInAnyOrder(
+            MongoCollection<Document> coll, String... ids) {
+        List<String> actualIds = new ArrayList<>();
+        coll.find(Filters.in("_id", ids)).map(d -> 
d.getString("_id")).into(actualIds);
+
+        assertThat(actualIds).containsExactlyInAnyOrder(ids);
+    }
+
+    public static void assertThatIdsAreWrittenInOrder(
+            MongoCollection<Document> coll, Integer... ids) {
+        List<Integer> actualIds = new ArrayList<>();
+        coll.find(Filters.in("_id", ids)).map(d -> 
d.getInteger("_id")).into(actualIds);
+
+        assertThat(actualIds).containsExactly(ids);
+    }
+
     public static void assertThatIdsAreWrittenWithMaxWaitTime(
             MongoCollection<Document> coll, long maxWaitTimeMs, Integer... ids)
             throws InterruptedException {
@@ -104,7 +128,7 @@ public class MongoTestUtil {
             }
             Thread.sleep(1000L);
         }
-        assertThatIdsAreWritten(coll, ids);
+        assertThatIdsAreWrittenInAnyOrder(coll, ids);
     }
 
     public static String getConnectorSql(

Reply via email to