This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push:
new e1b9684 [hotfix][test][connectors/mongodb] Update MongoWriterITCase
to be compatible with updated SinkV2 interfaces
e1b9684 is described below
commit e1b96849dc314b8b31a460757168972f17fbec1a
Author: Jiabao Sun <[email protected]>
AuthorDate: Mon Jan 29 09:50:00 2024 +0800
[hotfix][test][connectors/mongodb] Update MongoWriterITCase to be
compatible with updated SinkV2 interfaces
This closes #22.
---
.../mongodb/sink/writer/MongoWriterITCase.java | 33 +++++++++++-----------
1 file changed, 16 insertions(+), 17 deletions(-)
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
index 84767e8..bd3ca66 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java
@@ -17,8 +17,9 @@
package org.apache.flink.connector.mongodb.sink.writer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
-import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import
org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
@@ -49,6 +50,7 @@ import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
+import java.io.IOException;
import java.util.Optional;
import static
org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
@@ -235,7 +237,7 @@ public class MongoWriterITCase {
MongoWriteOptions.builder()
.setBatchSize(batchSize)
.setBatchIntervalMs(batchIntervalMs)
- .setMaxRetries(0)
+ .setDeliveryGuarantee(DeliveryGuarantee.NONE)
.build();
MongoSerializationSchema<Document> testSerializationSchema =
@@ -269,7 +271,8 @@ public class MongoWriterITCase {
}
private static MongoWriter<Document> createWriter(
- String collection, int batchSize, long batchIntervalMs, boolean
flushOnCheckpoint) {
+ String collection, int batchSize, long batchIntervalMs, boolean
flushOnCheckpoint)
+ throws IOException {
return createWriter(
collection,
batchSize,
@@ -283,28 +286,24 @@ public class MongoWriterITCase {
int batchSize,
long batchIntervalMs,
boolean flushOnCheckpoint,
- MongoSerializationSchema<Document> serializationSchema) {
+ MongoSerializationSchema<Document> serializationSchema)
+ throws IOException {
- MongoConnectionOptions connectionOptions =
- MongoConnectionOptions.builder()
+ MongoSink<Document> mongoSink =
+ MongoSink.<Document>builder()
.setUri(MONGO_CONTAINER.getConnectionString())
.setDatabase(TEST_DATABASE)
.setCollection(collection)
- .build();
-
- MongoWriteOptions writeOptions =
- MongoWriteOptions.builder()
.setBatchSize(batchSize)
.setBatchIntervalMs(batchIntervalMs)
- .setMaxRetries(0)
+ .setDeliveryGuarantee(
+ flushOnCheckpoint
+ ? DeliveryGuarantee.AT_LEAST_ONCE
+ : DeliveryGuarantee.NONE)
+ .setSerializationSchema(serializationSchema)
.build();
- return new MongoWriter<>(
- connectionOptions,
- writeOptions,
- flushOnCheckpoint,
- sinkInitContext,
- serializationSchema);
+ return (MongoWriter<Document>) mongoSink.createWriter(sinkInitContext);
}
private static Document buildMessage(int id) {