Savonitar commented on code in PR #58:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/58#discussion_r2613884362


##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java:
##########
@@ -92,6 +92,34 @@ static void tearDown() {
         }
     }
 
+    @Test
+    void unorderedWrite() throws Exception {
+        final String collection = "test-sink-with-unordered-write";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, 
true);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+    }
+
+    @Test
+    void bypassDocumentValidation() throws Exception {
+        final String collection = "test-sink-with-bypass-doc-validation";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, 
false);

Review Comment:
   > createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, false);
   
   In this test, you are passing `false` to the bypass parameter, effectively 
disabling the feature you intend to test. Or I'm missing something?



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java:
##########
@@ -92,6 +92,34 @@ static void tearDown() {
         }
     }
 
+    @Test
+    void unorderedWrite() throws Exception {

Review Comment:
   nit: The naming style is slightly inconsistent (`unorderedWrite` vs 
`testRecovery`)
   Maybe it is cleaner to pick one style?



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java:
##########
@@ -92,6 +92,34 @@ static void tearDown() {
         }
     }
 
+    @Test
+    void unorderedWrite() throws Exception {
+        final String collection = "test-sink-with-unordered-write";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, 
true);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+    }
+
+    @Test
+    void bypassDocumentValidation() throws Exception {

Review Comment:
   same 
https://github.com/apache/flink-connector-mongodb/pull/58/changes#r2613860139



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java:
##########
@@ -92,6 +92,34 @@ static void tearDown() {
         }
     }
 
+    @Test
+    void unorderedWrite() throws Exception {
+        final String collection = "test-sink-with-unordered-write";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, 
true);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);

Review Comment:
   The current tests don't fully validate the behavioral difference between 
ordered and unordered writes.
   It would be valuable to add a test case that injects a failure (e.g., a 
duplicate key error) in the middle of a batch.



##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/MongoSinkITCase.java:
##########
@@ -92,6 +92,34 @@ static void tearDown() {
         }
     }
 
+    @Test
+    void unorderedWrite() throws Exception {
+        final String collection = "test-sink-with-unordered-write";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, false, 
true);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);
+    }
+
+    @Test
+    void bypassDocumentValidation() throws Exception {
+        final String collection = "test-sink-with-bypass-doc-validation";
+        final MongoSink<Document> sink =
+                createSink(collection, DeliveryGuarantee.AT_LEAST_ONCE, true, 
false);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(100L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        env.fromSequence(1, 5).map(new TestMapFunction()).sinkTo(sink);
+        env.execute();
+        assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4, 5);

Review Comment:
   I think the current test passes regardless of the flag because the 
collection has no validation rules. 
   Maybe to verify `bypassDocumentValidation` works, we can create the 
collection with a validator and assert that writes only succeed when the bypass 
flag is enabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to