This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b1203c905 [Improve][Connector-v2][Mongodb]sink support transaction 
update/writing (#5034)
b1203c905 is described below

commit b1203c905e81d9b161e801e79f908f154d91ae2f
Author: monster <[email protected]>
AuthorDate: Mon Jul 10 20:09:41 2023 +0800

    [Improve][Connector-v2][Mongodb]sink support transaction update/writing 
(#5034)
---
 docs/en/connector-v2/sink/MongoDB.md               |   3 +-
 .../seatunnel/mongodb/config/MongodbConfig.java    |   3 +
 .../seatunnel/mongodb/sink/MongodbSink.java        |  44 +++++-
 .../seatunnel/mongodb/sink/MongodbWriter.java      |  62 +++++++-
 .../mongodb/sink/MongodbWriterOptions.java         |  18 ++-
 .../sink/commit/CommittableTransaction.java        |  49 ++++++
 .../sink/commit/CommittableUpsertTransaction.java  |  68 +++++++++
 .../commit/MongodbSinkAggregatedCommitter.java     | 167 +++++++++++++++++++++
 .../seatunnel/mongodb/sink/state/DocumentBulk.java |  61 ++++++++
 .../sink/state/MongodbAggregatedCommitInfo.java    |  30 ++++
 .../mongodb/sink/state/MongodbCommitInfo.java      |  30 ++++
 .../connector/v2/mongodb/AbstractMongodbIT.java    |   5 +
 .../e2e/connector/v2/mongodb/MongodbIT.java        |  27 ++++
 .../fake_source_to_transaction_sink_mongodb.conf   | 102 +++++++++++++
 .../fake_source_to_transaction_upsert_mongodb.conf | 104 +++++++++++++
 .../mongodb_source_transaction_sink_to_assert.conf | 115 ++++++++++++++
 ...ongodb_source_transaction_upsert_to_assert.conf | 115 ++++++++++++++
 17 files changed, 987 insertions(+), 16 deletions(-)

diff --git a/docs/en/connector-v2/sink/MongoDB.md 
b/docs/en/connector-v2/sink/MongoDB.md
index 464ecdeab..c4cbad95e 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -11,7 +11,7 @@
 Key Features
 ------------
 
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
 - [x] [cdc](../../concept/connector-v2-features.md)
 
 **Tips**
@@ -73,6 +73,7 @@ The following table lists the field data type mapping from 
MongoDB BSON type to
 | retry.interval        | Duration | No       | 1000    | Specifies the retry 
time interval if writing records to database failed, the unit is millisecond.   
                         |
 | upsert-enable         | Boolean  | No       | false   | Whether to write 
documents via upsert mode.                                                      
                            |
 | primary-key           | List     | No       | -       | The primary keys for 
upsert/update. Keys are in `["id","name",...]` format for properties.           
                        |
+| transaction           | Boolean  | No       | false   | Whether to use 
transactions in MongoSink (requires MongoDB 4.2+).                              
                              |
 | common-options        |          | No       | -       | Source plugin common 
parameters, please refer to [Source Common Options](common-options.md) for 
details                      |
 
 ### Tips
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index 132263125..848a120e2 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -150,4 +150,7 @@ public class MongodbConfig {
                     .withDescription(
                             "The primary keys for upsert/update. Keys are in 
csv format for properties.")
                     .withFallbackKeys("upsert-key");
+
+    public static final Option<Boolean> TRANSACTION =
+            
Options.key("transaction").booleanType().defaultValue(false).withDescription(".");
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index fa2c212c3..160aa966a 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -20,25 +20,33 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
 
 import com.google.auto.service.AutoService;
 
 import java.util.List;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
 
 @AutoService(SeaTunnelSink.class)
-public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class MongodbSink
+        implements SeaTunnelSink<
+                SeaTunnelRow, DocumentBulk, MongodbCommitInfo, 
MongodbAggregatedCommitInfo> {
 
     private MongodbWriterOptions options;
 
@@ -89,6 +97,10 @@ public class MongodbSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             if (pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())) {
                 
builder.withRetryInterval(pluginConfig.getLong(MongodbConfig.RETRY_INTERVAL.key()));
             }
+
+            if (pluginConfig.hasPath(MongodbConfig.TRANSACTION.key())) {
+                
builder.withTransaction(pluginConfig.getBoolean(MongodbConfig.TRANSACTION.key()));
+            }
             this.options = builder.build();
         }
     }
@@ -109,7 +121,8 @@ public class MongodbSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
+    public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> 
createWriter(
+            SinkWriter.Context context) {
         return new MongodbWriter(
                 new RowDataDocumentSerializer(
                         
RowDataToBsonConverters.createConverter(seaTunnelRowType),
@@ -118,4 +131,27 @@ public class MongodbSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                 options,
                 context);
     }
+
+    @Override
+    public Optional<Serializer<DocumentBulk>> getWriterStateSerializer() {
+        return options.transaction ? Optional.of(new DefaultSerializer<>()) : 
Optional.empty();
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<MongodbCommitInfo, 
MongodbAggregatedCommitInfo>>
+            createAggregatedCommitter() {
+        return options.transaction
+                ? Optional.of(new MongodbSinkAggregatedCommitter(options))
+                : Optional.empty();
+    }
+
+    @Override
+    public Optional<Serializer<MongodbAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
+        return options.transaction ? Optional.of(new DefaultSerializer<>()) : 
Optional.empty();
+    }
+
+    @Override
+    public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
+        return options.transaction ? Optional.of(new DefaultSerializer<>()) : 
Optional.empty();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
index 794c3bf04..0eb131d44 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
@@ -20,16 +20,19 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
 
 import org.bson.BsonDocument;
 
 import com.mongodb.MongoException;
 import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.UpdateOneModel;
 import com.mongodb.client.model.WriteModel;
 import lombok.extern.slf4j.Slf4j;
 
@@ -37,12 +40,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
 
 @Slf4j
-public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class MongodbWriter implements SinkWriter<SeaTunnelRow, 
MongodbCommitInfo, DocumentBulk> {
 
     private MongodbClientProvider collectionProvider;
 
@@ -60,6 +65,8 @@ public class MongodbWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private volatile long lastSendTime = 0L;
 
+    private boolean transaction;
+
     // TODO:Reserve parameters.
     private final SinkWriter.Context context;
 
@@ -84,27 +91,66 @@ public class MongodbWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
                         .build();
         this.bulkActions = options.getFlushSize();
         this.batchIntervalMs = options.getBatchIntervalMs();
+        this.transaction = options.transaction;
     }
 
     @Override
     public void write(SeaTunnelRow o) {
         if (o.getRowKind() != RowKind.UPDATE_BEFORE) {
             bulkRequests.add(serializer.serializeToWriteModel(o));
-            if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+            if (!transaction && (isOverMaxBatchSizeLimit() || 
isOverMaxBatchIntervalLimit())) {
                 doBulkWrite();
             }
         }
     }
 
-    @Override
-    public Optional<Void> prepareCommit() {
-        doBulkWrite();
-        return Optional.empty();
+    public Optional<MongodbCommitInfo> prepareCommit() {
+        if (!transaction) {
+            doBulkWrite();
+            return Optional.empty();
+        }
+
+        List<DocumentBulk> bsonDocuments = new ArrayList<>();
+        AtomicInteger counter = new AtomicInteger();
+
+        bulkRequests.stream()
+                .map(this::convertModelToBsonDocument)
+                .collect(
+                        Collectors.groupingBy(
+                                it -> counter.getAndIncrement() / 
DocumentBulk.BUFFER_SIZE))
+                .values()
+                .stream()
+                .map(this::convertBsonDocumentListToDocumentBulk)
+                .forEach(bsonDocuments::add);
+
+        bulkRequests.clear();
+
+        return Optional.of(new MongodbCommitInfo(bsonDocuments));
+    }
+
+    private BsonDocument convertModelToBsonDocument(WriteModel<BsonDocument> 
model) {
+        if (model instanceof InsertOneModel) {
+            return ((InsertOneModel<BsonDocument>) model).getDocument();
+        } else if (model instanceof UpdateOneModel) {
+            return (BsonDocument) ((UpdateOneModel<BsonDocument>) 
model).getUpdate();
+        }
+        return null;
+    }
+
+    private DocumentBulk 
convertBsonDocumentListToDocumentBulk(List<BsonDocument> documentList) {
+        DocumentBulk documentBulk = new DocumentBulk();
+        documentList.forEach(documentBulk::add);
+        return documentBulk;
     }
 
+    @Override
+    public void abortPrepare() {}
+
     @Override
     public void close() {
-        doBulkWrite();
+        if (!transaction) {
+            doBulkWrite();
+        }
         if (collectionProvider != null) {
             collectionProvider.close();
         }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
index be8becd32..e9b826477 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
@@ -44,16 +44,19 @@ public class MongodbWriterOptions implements Serializable {
 
     protected final long retryInterval;
 
+    protected final boolean transaction;
+
     public MongodbWriterOptions(
             String connectString,
             String database,
             String collection,
             int flushSize,
-            Long batchIntervalMs,
+            long batchIntervalMs,
             boolean upsertEnable,
             String[] primaryKey,
             int retryMax,
-            Long retryInterval) {
+            long retryInterval,
+            boolean transaction) {
         this.connectString = connectString;
         this.database = database;
         this.collection = collection;
@@ -63,6 +66,7 @@ public class MongodbWriterOptions implements Serializable {
         this.primaryKey = primaryKey;
         this.retryMax = retryMax;
         this.retryInterval = retryInterval;
+        this.transaction = transaction;
     }
 
     public static Builder builder() {
@@ -89,6 +93,8 @@ public class MongodbWriterOptions implements Serializable {
 
         protected long retryInterval;
 
+        protected boolean transaction;
+
         public Builder withConnectString(String connectString) {
             this.connectString = connectString;
             return this;
@@ -134,6 +140,11 @@ public class MongodbWriterOptions implements Serializable {
             return this;
         }
 
+        public Builder withTransaction(boolean transaction) {
+            this.transaction = transaction;
+            return this;
+        }
+
         public MongodbWriterOptions build() {
             return new MongodbWriterOptions(
                     connectString,
@@ -144,7 +155,8 @@ public class MongodbWriterOptions implements Serializable {
                     upsertEnable,
                     primaryKey,
                     retryMax,
-                    retryInterval);
+                    retryInterval,
+                    transaction);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java
new file mode 100644
index 000000000..42b61edbe
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.TransactionBody;
+import com.mongodb.client.result.InsertManyResult;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommittableTransaction implements TransactionBody<Integer>, 
Serializable {
+
+    private static final int BUFFER_INIT_SIZE = 1024;
+
+    protected final MongoCollection<BsonDocument> collection;
+
+    protected List<BsonDocument> bufferedDocuments = new 
ArrayList<>(BUFFER_INIT_SIZE);
+
+    public CommittableTransaction(
+            MongoCollection<BsonDocument> collection, List<BsonDocument> 
documents) {
+        this.collection = collection;
+        this.bufferedDocuments.addAll(documents);
+    }
+
+    @Override
+    public Integer execute() {
+        InsertManyResult result = collection.insertMany(bufferedDocuments);
+        return result.getInsertedIds().size();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java
new file mode 100644
index 000000000..1fa3669e9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;
+
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommittableUpsertTransaction extends CommittableTransaction {
+
+    private final String[] upsertKeys;
+    private final UpdateOptions updateOptions = new UpdateOptions();
+    private final BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
+
+    public CommittableUpsertTransaction(
+            MongoCollection<BsonDocument> collection,
+            List<BsonDocument> documents,
+            String[] upsertKeys) {
+        super(collection, documents);
+        this.upsertKeys = upsertKeys;
+        updateOptions.upsert(true);
+        bulkWriteOptions.ordered(true);
+    }
+
+    @Override
+    public Integer execute() {
+        List<UpdateOneModel<BsonDocument>> upserts = new ArrayList<>();
+        for (BsonDocument document : bufferedDocuments) {
+            List<Bson> filters = new ArrayList<>(upsertKeys.length);
+            for (String upsertKey : upsertKeys) {
+                Object o = document.get("$set").asDocument().get(upsertKey);
+                Bson eq = Filters.eq(upsertKey, o);
+                filters.add(eq);
+            }
+            Bson filter = Filters.and(filters);
+            UpdateOneModel<BsonDocument> updateOneModel =
+                    new UpdateOneModel<>(filter, document, updateOptions);
+            upserts.add(updateOneModel);
+        }
+
+        BulkWriteResult bulkWriteResult = collection.bulkWrite(upserts, 
bulkWriteOptions);
+        return bulkWriteResult.getUpserts().size() + 
bulkWriteResult.getInsertedCount();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java
new file mode 100644
index 000000000..0ee73a301
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.ReadConcern;
+import com.mongodb.ReadPreference;
+import com.mongodb.TransactionOptions;
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class MongodbSinkAggregatedCommitter
+        implements SinkAggregatedCommitter<MongodbCommitInfo, 
MongodbAggregatedCommitInfo> {
+
+    private static final long waitingTime = 5_000L;
+
+    private static final long TRANSACTION_TIMEOUT_MS = 60_000L;
+
+    private final boolean enableUpsert;
+
+    private final String[] upsertKeys;
+
+    private final MongodbClientProvider collectionProvider;
+
+    private ClientSession clientSession;
+
+    private MongoClient client;
+
+    public MongodbSinkAggregatedCommitter(MongodbWriterOptions options) {
+        this.enableUpsert = options.isUpsertEnable();
+        this.upsertKeys = options.getPrimaryKey();
+        this.collectionProvider =
+                MongodbCollectionProvider.builder()
+                        .connectionString(options.getConnectString())
+                        .database(options.getDatabase())
+                        .collection(options.getCollection())
+                        .build();
+    }
+
+    @Override
+    public List<MongodbAggregatedCommitInfo> commit(
+            List<MongodbAggregatedCommitInfo> aggregatedCommitInfo) {
+        return aggregatedCommitInfo.stream()
+                .map(this::processAggregatedCommitInfo)
+                .filter(
+                        failedAggregatedCommitInfo ->
+                                
!failedAggregatedCommitInfo.getCommitInfos().isEmpty())
+                .collect(Collectors.toList());
+    }
+
+    private MongodbAggregatedCommitInfo processAggregatedCommitInfo(
+            MongodbAggregatedCommitInfo aggregatedCommitInfo) {
+        List<MongodbCommitInfo> failedCommitInfos =
+                aggregatedCommitInfo.getCommitInfos().stream()
+                        .flatMap(
+                                (Function<MongodbCommitInfo, 
Stream<List<DocumentBulk>>>)
+                                        this::processCommitInfo)
+                        .filter(failedDocumentBulks -> 
!failedDocumentBulks.isEmpty())
+                        .map(MongodbCommitInfo::new)
+                        .collect(Collectors.toList());
+
+        return new MongodbAggregatedCommitInfo(failedCommitInfos);
+    }
+
+    private Stream<List<DocumentBulk>> processCommitInfo(MongodbCommitInfo 
commitInfo) {
+        client = collectionProvider.getClient();
+        clientSession = client.startSession();
+        MongoCollection<BsonDocument> collection = 
collectionProvider.getDefaultCollection();
+        return Stream.of(
+                commitInfo.getDocumentBulks().stream()
+                        .filter(bulk -> !bulk.getDocuments().isEmpty())
+                        .filter(
+                                bulk -> {
+                                    try {
+                                        CommittableTransaction transaction;
+                                        if (enableUpsert) {
+                                            transaction =
+                                                    new 
CommittableUpsertTransaction(
+                                                            collection,
+                                                            
bulk.getDocuments(),
+                                                            upsertKeys);
+                                        } else {
+                                            transaction =
+                                                    new CommittableTransaction(
+                                                            collection, 
bulk.getDocuments());
+                                        }
+
+                                        int insertedDocs =
+                                                clientSession.withTransaction(
+                                                        transaction,
+                                                        
TransactionOptions.builder()
+                                                                
.readPreference(
+                                                                        
ReadPreference.primary())
+                                                                
.readConcern(ReadConcern.LOCAL)
+                                                                
.writeConcern(WriteConcern.MAJORITY)
+                                                                .build());
+                                        log.info(
+                                                "Inserted {} documents into 
collection {}.",
+                                                insertedDocs,
+                                                collection.getNamespace());
+                                        return false;
+                                    } catch (Exception e) {
+                                        log.error("Failed to commit with Mongo 
transaction.", e);
+                                        return true;
+                                    }
+                                })
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    public MongodbAggregatedCommitInfo combine(List<MongodbCommitInfo> 
commitInfos) {
+        return new MongodbAggregatedCommitInfo(commitInfos);
+    }
+
+    @Override
+    public void abort(List<MongodbAggregatedCommitInfo> aggregatedCommitInfo) 
{}
+
+    @SneakyThrows
+    @Override
+    public void close() {
+        long deadline = System.currentTimeMillis() + TRANSACTION_TIMEOUT_MS;
+        while (clientSession.hasActiveTransaction() && 
System.currentTimeMillis() < deadline) {
+            // wait for active transaction to finish or timeout
+            Thread.sleep(waitingTime);
+        }
+        if (clientSession != null) {
+            clientSession.close();
+        }
+        if (client != null) {
+            client.close();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java
new file mode 100644
index 000000000..72a3d1053
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state;
+
+import org.bson.BsonDocument;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * DocumentBulk is buffered {@link BsonDocument} in memory, which would be 
written to MongoDB in a
+ * single transaction. Due to execution efficiency, each DocumentBulk maybe be 
limited to a maximum
+ * size, typically 1,000 documents. But for the transactional mode, the 
maximum size should not be
+ * respected because all that data must be written in one transaction.
+ */
+@ToString
+@EqualsAndHashCode
+public class DocumentBulk implements Serializable {
+
+    public static final int BUFFER_SIZE = 1024;
+
+    private final List<BsonDocument> bufferedDocuments;
+
+    public DocumentBulk() {
+        bufferedDocuments = new ArrayList<>(BUFFER_SIZE);
+    }
+
+    public void add(BsonDocument document) {
+        if (bufferedDocuments.size() == BUFFER_SIZE) {
+            throw new IllegalStateException("DocumentBulk is already full");
+        }
+        bufferedDocuments.add(document);
+    }
+
+    public int size() {
+        return bufferedDocuments.size();
+    }
+
+    public List<BsonDocument> getDocuments() {
+        return bufferedDocuments;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java
new file mode 100644
index 000000000..6b97d616a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class MongodbAggregatedCommitInfo implements Serializable {
+    List<MongodbCommitInfo> commitInfos;
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java
new file mode 100644
index 000000000..052cd4c5a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class MongodbCommitInfo implements Serializable {
+    List<DocumentBulk> documentBulks;
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index 4c85c0d09..5dbe7cf34 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -82,6 +82,11 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
 
     protected static final String MONGODB_CDC_RESULT_TABLE = "test_cdc_table";
 
+    protected static final String MONGODB_TRANSACTION_SINK_TABLE =
+            "test_source_transaction_sink_table";
+    protected static final String MONGODB_TRANSACTION_UPSERT_TABLE =
+            "test_source_upsert_transaction_table";
+
     protected GenericContainer<?> mongodbContainer;
 
     protected MongoClient client;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index bc6f88400..fb643455a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -174,4 +174,31 @@ public class MongodbIT extends AbstractMongodbIT {
                         .collect(Collectors.toList()));
         clearDate(MONGODB_MATCH_RESULT_TABLE);
     }
+
+    @TestTemplate
+    public void testTransactionSinkAndUpsert(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult insertResult =
+                
container.executeJob("/transactionIT/fake_source_to_transaction_sink_mongodb.conf");
+        Assertions.assertEquals(0, insertResult.getExitCode(), 
insertResult.getStderr());
+
+        Container.ExecResult assertSinkResult =
+                container.executeJob(
+                        
"/transactionIT/mongodb_source_transaction_sink_to_assert.conf");
+        Assertions.assertEquals(0, assertSinkResult.getExitCode(), 
assertSinkResult.getStderr());
+
+        Container.ExecResult upsertResult =
+                container.executeJob(
+                        
"/transactionIT/fake_source_to_transaction_upsert_mongodb.conf");
+        Assertions.assertEquals(0, upsertResult.getExitCode(), 
upsertResult.getStderr());
+
+        Container.ExecResult assertUpsertResult =
+                container.executeJob(
+                        
"/transactionIT/mongodb_source_transaction_upsert_to_assert.conf");
+        Assertions.assertEquals(
+                0, assertUpsertResult.getExitCode(), 
assertUpsertResult.getStderr());
+
+        clearDate(MONGODB_TRANSACTION_SINK_TABLE);
+        clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
new file mode 100644
index 000000000..67947eb95
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 50
+    int.template = [3]
+    split.num = 5
+    split.read-interval = 100
+    result_table_name = "mongodb_table"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017"
+    database = "test_db"
+    collection = "test_source_transaction_sink_table"
+    transaction = true
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf
new file mode 100644
index 000000000..53a98fe28
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 50
+    int.template = [2]
+    split.num = 5
+    split.read-interval = 100
+    result_table_name = "mongodb_table"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017"
+    database = "test_db"
+    collection = "test_source_upsert_transaction_table"
+    transaction = true
+    upsert-enable = true
+    primary-key = ["c_int"]
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf
new file mode 100644
index 000000000..f453ff5df
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db"
+    database = "test_db"
+    collection = "test_source_transaction_sink_table"
+    cursor.no-timeout = true
+    result_table_name = "mongodb_table"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "mongodb_table"
+  }
+  Assert {
+    source_table_name = "mongodb_table"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 50
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 50
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf
new file mode 100644
index 000000000..0a5f8e5e1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db"
+    database = "test_db"
+    collection = "test_source_upsert_transaction_table"
+    cursor.no-timeout = true
+    result_table_name = "mongodb_table"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(33, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(33, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "mongodb_table"
+  }
+  Assert {
+    source_table_name = "mongodb_table"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 1
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 1
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}

Reply via email to