This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b1203c905 [Improve][Connector-v2][Mongodb]sink support transaction
update/writing (#5034)
b1203c905 is described below
commit b1203c905e81d9b161e801e79f908f154d91ae2f
Author: monster <[email protected]>
AuthorDate: Mon Jul 10 20:09:41 2023 +0800
[Improve][Connector-v2][Mongodb]sink support transaction update/writing
(#5034)
---
docs/en/connector-v2/sink/MongoDB.md | 3 +-
.../seatunnel/mongodb/config/MongodbConfig.java | 3 +
.../seatunnel/mongodb/sink/MongodbSink.java | 44 +++++-
.../seatunnel/mongodb/sink/MongodbWriter.java | 62 +++++++-
.../mongodb/sink/MongodbWriterOptions.java | 18 ++-
.../sink/commit/CommittableTransaction.java | 49 ++++++
.../sink/commit/CommittableUpsertTransaction.java | 68 +++++++++
.../commit/MongodbSinkAggregatedCommitter.java | 167 +++++++++++++++++++++
.../seatunnel/mongodb/sink/state/DocumentBulk.java | 61 ++++++++
.../sink/state/MongodbAggregatedCommitInfo.java | 30 ++++
.../mongodb/sink/state/MongodbCommitInfo.java | 30 ++++
.../connector/v2/mongodb/AbstractMongodbIT.java | 5 +
.../e2e/connector/v2/mongodb/MongodbIT.java | 27 ++++
.../fake_source_to_transaction_sink_mongodb.conf | 102 +++++++++++++
.../fake_source_to_transaction_upsert_mongodb.conf | 104 +++++++++++++
.../mongodb_source_transaction_sink_to_assert.conf | 115 ++++++++++++++
...ongodb_source_transaction_upsert_to_assert.conf | 115 ++++++++++++++
17 files changed, 987 insertions(+), 16 deletions(-)
diff --git a/docs/en/connector-v2/sink/MongoDB.md
b/docs/en/connector-v2/sink/MongoDB.md
index 464ecdeab..c4cbad95e 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -11,7 +11,7 @@
Key Features
------------
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)
**Tips**
@@ -73,6 +73,7 @@ The following table lists the field data type mapping from
MongoDB BSON type to
| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond.
|
| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
| primary-key | List | No | - | The primary keys for
upsert/update. Keys are in `["id","name",...]` format for properties.
|
+| transaction | Boolean | No | false | Whether to use
transactions in MongoSink (requires MongoDB 4.2+).
|
| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](common-options.md) for
details |
### Tips
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index 132263125..848a120e2 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -150,4 +150,7 @@ public class MongodbConfig {
.withDescription(
"The primary keys for upsert/update. Keys are in
csv format for properties.")
.withFallbackKeys("upsert-key");
+
+ public static final Option<Boolean> TRANSACTION =
+
Options.key("transaction").booleanType().defaultValue(false).withDescription(".");
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index fa2c212c3..160aa966a 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -20,25 +20,33 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import com.google.auto.service.AutoService;
import java.util.List;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
@AutoService(SeaTunnelSink.class)
-public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class MongodbSink
+ implements SeaTunnelSink<
+ SeaTunnelRow, DocumentBulk, MongodbCommitInfo,
MongodbAggregatedCommitInfo> {
private MongodbWriterOptions options;
@@ -89,6 +97,10 @@ public class MongodbSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
if (pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())) {
builder.withRetryInterval(pluginConfig.getLong(MongodbConfig.RETRY_INTERVAL.key()));
}
+
+ if (pluginConfig.hasPath(MongodbConfig.TRANSACTION.key())) {
+
builder.withTransaction(pluginConfig.getBoolean(MongodbConfig.TRANSACTION.key()));
+ }
this.options = builder.build();
}
}
@@ -109,7 +121,8 @@ public class MongodbSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
+ public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk>
createWriter(
+ SinkWriter.Context context) {
return new MongodbWriter(
new RowDataDocumentSerializer(
RowDataToBsonConverters.createConverter(seaTunnelRowType),
@@ -118,4 +131,27 @@ public class MongodbSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
options,
context);
}
+
+ @Override
+ public Optional<Serializer<DocumentBulk>> getWriterStateSerializer() {
+ return options.transaction ? Optional.of(new DefaultSerializer<>()) :
Optional.empty();
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<MongodbCommitInfo,
MongodbAggregatedCommitInfo>>
+ createAggregatedCommitter() {
+ return options.transaction
+ ? Optional.of(new MongodbSinkAggregatedCommitter(options))
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<Serializer<MongodbAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
+ return options.transaction ? Optional.of(new DefaultSerializer<>()) :
Optional.empty();
+ }
+
+ @Override
+ public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
+ return options.transaction ? Optional.of(new DefaultSerializer<>()) :
Optional.empty();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
index 794c3bf04..0eb131d44 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
@@ -20,16 +20,19 @@ package
org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.bson.BsonDocument;
import com.mongodb.MongoException;
import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import lombok.extern.slf4j.Slf4j;
@@ -37,12 +40,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.WRITER_OPERATION_FAILED;
@Slf4j
-public class MongodbWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class MongodbWriter implements SinkWriter<SeaTunnelRow,
MongodbCommitInfo, DocumentBulk> {
private MongodbClientProvider collectionProvider;
@@ -60,6 +65,8 @@ public class MongodbWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private volatile long lastSendTime = 0L;
+ private boolean transaction;
+
// TODOļ¼Reserve parameters.
private final SinkWriter.Context context;
@@ -84,27 +91,66 @@ public class MongodbWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
.build();
this.bulkActions = options.getFlushSize();
this.batchIntervalMs = options.getBatchIntervalMs();
+ this.transaction = options.transaction;
}
@Override
public void write(SeaTunnelRow o) {
if (o.getRowKind() != RowKind.UPDATE_BEFORE) {
bulkRequests.add(serializer.serializeToWriteModel(o));
- if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+ if (!transaction && (isOverMaxBatchSizeLimit() ||
isOverMaxBatchIntervalLimit())) {
doBulkWrite();
}
}
}
- @Override
- public Optional<Void> prepareCommit() {
- doBulkWrite();
- return Optional.empty();
+ public Optional<MongodbCommitInfo> prepareCommit() {
+ if (!transaction) {
+ doBulkWrite();
+ return Optional.empty();
+ }
+
+ List<DocumentBulk> bsonDocuments = new ArrayList<>();
+ AtomicInteger counter = new AtomicInteger();
+
+ bulkRequests.stream()
+ .map(this::convertModelToBsonDocument)
+ .collect(
+ Collectors.groupingBy(
+ it -> counter.getAndIncrement() /
DocumentBulk.BUFFER_SIZE))
+ .values()
+ .stream()
+ .map(this::convertBsonDocumentListToDocumentBulk)
+ .forEach(bsonDocuments::add);
+
+ bulkRequests.clear();
+
+ return Optional.of(new MongodbCommitInfo(bsonDocuments));
+ }
+
+ private BsonDocument convertModelToBsonDocument(WriteModel<BsonDocument>
model) {
+ if (model instanceof InsertOneModel) {
+ return ((InsertOneModel<BsonDocument>) model).getDocument();
+ } else if (model instanceof UpdateOneModel) {
+ return (BsonDocument) ((UpdateOneModel<BsonDocument>)
model).getUpdate();
+ }
+ return null;
+ }
+
+ private DocumentBulk
convertBsonDocumentListToDocumentBulk(List<BsonDocument> documentList) {
+ DocumentBulk documentBulk = new DocumentBulk();
+ documentList.forEach(documentBulk::add);
+ return documentBulk;
}
+ @Override
+ public void abortPrepare() {}
+
@Override
public void close() {
- doBulkWrite();
+ if (!transaction) {
+ doBulkWrite();
+ }
if (collectionProvider != null) {
collectionProvider.close();
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
index be8becd32..e9b826477 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
@@ -44,16 +44,19 @@ public class MongodbWriterOptions implements Serializable {
protected final long retryInterval;
+ protected final boolean transaction;
+
public MongodbWriterOptions(
String connectString,
String database,
String collection,
int flushSize,
- Long batchIntervalMs,
+ long batchIntervalMs,
boolean upsertEnable,
String[] primaryKey,
int retryMax,
- Long retryInterval) {
+ long retryInterval,
+ boolean transaction) {
this.connectString = connectString;
this.database = database;
this.collection = collection;
@@ -63,6 +66,7 @@ public class MongodbWriterOptions implements Serializable {
this.primaryKey = primaryKey;
this.retryMax = retryMax;
this.retryInterval = retryInterval;
+ this.transaction = transaction;
}
public static Builder builder() {
@@ -89,6 +93,8 @@ public class MongodbWriterOptions implements Serializable {
protected long retryInterval;
+ protected boolean transaction;
+
public Builder withConnectString(String connectString) {
this.connectString = connectString;
return this;
@@ -134,6 +140,11 @@ public class MongodbWriterOptions implements Serializable {
return this;
}
+ public Builder withTransaction(boolean transaction) {
+ this.transaction = transaction;
+ return this;
+ }
+
public MongodbWriterOptions build() {
return new MongodbWriterOptions(
connectString,
@@ -144,7 +155,8 @@ public class MongodbWriterOptions implements Serializable {
upsertEnable,
primaryKey,
retryMax,
- retryInterval);
+ retryInterval,
+ transaction);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java
new file mode 100644
index 000000000..42b61edbe
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableTransaction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.TransactionBody;
+import com.mongodb.client.result.InsertManyResult;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommittableTransaction implements TransactionBody<Integer>,
Serializable {
+
+ private static final int BUFFER_INIT_SIZE = 1024;
+
+ protected final MongoCollection<BsonDocument> collection;
+
+ protected List<BsonDocument> bufferedDocuments = new
ArrayList<>(BUFFER_INIT_SIZE);
+
+ public CommittableTransaction(
+ MongoCollection<BsonDocument> collection, List<BsonDocument>
documents) {
+ this.collection = collection;
+ this.bufferedDocuments.addAll(documents);
+ }
+
+ @Override
+ public Integer execute() {
+ InsertManyResult result = collection.insertMany(bufferedDocuments);
+ return result.getInsertedIds().size();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java
new file mode 100644
index 000000000..1fa3669e9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/CommittableUpsertTransaction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;
+
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommittableUpsertTransaction extends CommittableTransaction {
+
+ private final String[] upsertKeys;
+ private final UpdateOptions updateOptions = new UpdateOptions();
+ private final BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
+
+ public CommittableUpsertTransaction(
+ MongoCollection<BsonDocument> collection,
+ List<BsonDocument> documents,
+ String[] upsertKeys) {
+ super(collection, documents);
+ this.upsertKeys = upsertKeys;
+ updateOptions.upsert(true);
+ bulkWriteOptions.ordered(true);
+ }
+
+ @Override
+ public Integer execute() {
+ List<UpdateOneModel<BsonDocument>> upserts = new ArrayList<>();
+ for (BsonDocument document : bufferedDocuments) {
+ List<Bson> filters = new ArrayList<>(upsertKeys.length);
+ for (String upsertKey : upsertKeys) {
+ Object o = document.get("$set").asDocument().get(upsertKey);
+ Bson eq = Filters.eq(upsertKey, o);
+ filters.add(eq);
+ }
+ Bson filter = Filters.and(filters);
+ UpdateOneModel<BsonDocument> updateOneModel =
+ new UpdateOneModel<>(filter, document, updateOptions);
+ upserts.add(updateOneModel);
+ }
+
+ BulkWriteResult bulkWriteResult = collection.bulkWrite(upserts,
bulkWriteOptions);
+ return bulkWriteResult.getUpserts().size() +
bulkWriteResult.getInsertedCount();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java
new file mode 100644
index 000000000..0ee73a301
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/commit/MongodbSinkAggregatedCommitter.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
+
+import org.bson.BsonDocument;
+
+import com.mongodb.ReadConcern;
+import com.mongodb.ReadPreference;
+import com.mongodb.TransactionOptions;
+import com.mongodb.WriteConcern;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class MongodbSinkAggregatedCommitter
+ implements SinkAggregatedCommitter<MongodbCommitInfo,
MongodbAggregatedCommitInfo> {
+
+ private static final long waitingTime = 5_000L;
+
+ private static final long TRANSACTION_TIMEOUT_MS = 60_000L;
+
+ private final boolean enableUpsert;
+
+ private final String[] upsertKeys;
+
+ private final MongodbClientProvider collectionProvider;
+
+ private ClientSession clientSession;
+
+ private MongoClient client;
+
+ public MongodbSinkAggregatedCommitter(MongodbWriterOptions options) {
+ this.enableUpsert = options.isUpsertEnable();
+ this.upsertKeys = options.getPrimaryKey();
+ this.collectionProvider =
+ MongodbCollectionProvider.builder()
+ .connectionString(options.getConnectString())
+ .database(options.getDatabase())
+ .collection(options.getCollection())
+ .build();
+ }
+
+ @Override
+ public List<MongodbAggregatedCommitInfo> commit(
+ List<MongodbAggregatedCommitInfo> aggregatedCommitInfo) {
+ return aggregatedCommitInfo.stream()
+ .map(this::processAggregatedCommitInfo)
+ .filter(
+ failedAggregatedCommitInfo ->
+
!failedAggregatedCommitInfo.getCommitInfos().isEmpty())
+ .collect(Collectors.toList());
+ }
+
+ private MongodbAggregatedCommitInfo processAggregatedCommitInfo(
+ MongodbAggregatedCommitInfo aggregatedCommitInfo) {
+ List<MongodbCommitInfo> failedCommitInfos =
+ aggregatedCommitInfo.getCommitInfos().stream()
+ .flatMap(
+ (Function<MongodbCommitInfo,
Stream<List<DocumentBulk>>>)
+ this::processCommitInfo)
+ .filter(failedDocumentBulks ->
!failedDocumentBulks.isEmpty())
+ .map(MongodbCommitInfo::new)
+ .collect(Collectors.toList());
+
+ return new MongodbAggregatedCommitInfo(failedCommitInfos);
+ }
+
+ private Stream<List<DocumentBulk>> processCommitInfo(MongodbCommitInfo
commitInfo) {
+ client = collectionProvider.getClient();
+ clientSession = client.startSession();
+ MongoCollection<BsonDocument> collection =
collectionProvider.getDefaultCollection();
+ return Stream.of(
+ commitInfo.getDocumentBulks().stream()
+ .filter(bulk -> !bulk.getDocuments().isEmpty())
+ .filter(
+ bulk -> {
+ try {
+ CommittableTransaction transaction;
+ if (enableUpsert) {
+ transaction =
+ new
CommittableUpsertTransaction(
+ collection,
+
bulk.getDocuments(),
+ upsertKeys);
+ } else {
+ transaction =
+ new CommittableTransaction(
+ collection,
bulk.getDocuments());
+ }
+
+ int insertedDocs =
+ clientSession.withTransaction(
+ transaction,
+
TransactionOptions.builder()
+
.readPreference(
+
ReadPreference.primary())
+
.readConcern(ReadConcern.LOCAL)
+
.writeConcern(WriteConcern.MAJORITY)
+ .build());
+ log.info(
+ "Inserted {} documents into
collection {}.",
+ insertedDocs,
+ collection.getNamespace());
+ return false;
+ } catch (Exception e) {
+ log.error("Failed to commit with Mongo
transaction.", e);
+ return true;
+ }
+ })
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public MongodbAggregatedCommitInfo combine(List<MongodbCommitInfo>
commitInfos) {
+ return new MongodbAggregatedCommitInfo(commitInfos);
+ }
+
+ @Override
+ public void abort(List<MongodbAggregatedCommitInfo> aggregatedCommitInfo)
{}
+
+ @SneakyThrows
+ @Override
+ public void close() {
+ long deadline = System.currentTimeMillis() + TRANSACTION_TIMEOUT_MS;
+ while (clientSession.hasActiveTransaction() &&
System.currentTimeMillis() < deadline) {
+ // wait for active transaction to finish or timeout
+ Thread.sleep(waitingTime);
+ }
+ if (clientSession != null) {
+ clientSession.close();
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java
new file mode 100644
index 000000000..72a3d1053
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/DocumentBulk.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state;
+
+import org.bson.BsonDocument;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * DocumentBulk is buffered {@link BsonDocument} in memory, which would be
written to MongoDB in a
+ * single transaction. Due to execution efficiency, each DocumentBulk maybe be
limited to a maximum
+ * size, typically 1,000 documents. But for the transactional mode, the
maximum size should not be
+ * respected because all that data must be written in one transaction.
+ */
+@ToString
+@EqualsAndHashCode
+public class DocumentBulk implements Serializable {
+
+ public static final int BUFFER_SIZE = 1024;
+
+ private final List<BsonDocument> bufferedDocuments;
+
+ public DocumentBulk() {
+ bufferedDocuments = new ArrayList<>(BUFFER_SIZE);
+ }
+
+ public void add(BsonDocument document) {
+ if (bufferedDocuments.size() == BUFFER_SIZE) {
+ throw new IllegalStateException("DocumentBulk is already full");
+ }
+ bufferedDocuments.add(document);
+ }
+
+ public int size() {
+ return bufferedDocuments.size();
+ }
+
+ public List<BsonDocument> getDocuments() {
+ return bufferedDocuments;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java
new file mode 100644
index 000000000..6b97d616a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbAggregatedCommitInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class MongodbAggregatedCommitInfo implements Serializable {
+ List<MongodbCommitInfo> commitInfos;
+}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java
new file mode 100644
index 000000000..052cd4c5a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/state/MongodbCommitInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class MongodbCommitInfo implements Serializable {
+ List<DocumentBulk> documentBulks;
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index 4c85c0d09..5dbe7cf34 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -82,6 +82,11 @@ public abstract class AbstractMongodbIT extends
TestSuiteBase implements TestRes
protected static final String MONGODB_CDC_RESULT_TABLE = "test_cdc_table";
+ protected static final String MONGODB_TRANSACTION_SINK_TABLE =
+ "test_source_transaction_sink_table";
+ protected static final String MONGODB_TRANSACTION_UPSERT_TABLE =
+ "test_source_upsert_transaction_table";
+
protected GenericContainer<?> mongodbContainer;
protected MongoClient client;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index bc6f88400..fb643455a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -174,4 +174,31 @@ public class MongodbIT extends AbstractMongodbIT {
.collect(Collectors.toList()));
clearDate(MONGODB_MATCH_RESULT_TABLE);
}
+
+ @TestTemplate
+ public void testTransactionSinkAndUpsert(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult insertResult =
+
container.executeJob("/transactionIT/fake_source_to_transaction_sink_mongodb.conf");
+ Assertions.assertEquals(0, insertResult.getExitCode(),
insertResult.getStderr());
+
+ Container.ExecResult assertSinkResult =
+ container.executeJob(
+
"/transactionIT/mongodb_source_transaction_sink_to_assert.conf");
+ Assertions.assertEquals(0, assertSinkResult.getExitCode(),
assertSinkResult.getStderr());
+
+ Container.ExecResult upsertResult =
+ container.executeJob(
+
"/transactionIT/fake_source_to_transaction_upsert_mongodb.conf");
+ Assertions.assertEquals(0, upsertResult.getExitCode(),
upsertResult.getStderr());
+
+ Container.ExecResult assertUpsertResult =
+ container.executeJob(
+
"/transactionIT/mongodb_source_transaction_upsert_to_assert.conf");
+ Assertions.assertEquals(
+ 0, assertUpsertResult.getExitCode(),
assertUpsertResult.getStderr());
+
+ clearDate(MONGODB_TRANSACTION_SINK_TABLE);
+ clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
new file mode 100644
index 000000000..67947eb95
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_sink_mongodb.conf
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 50
+ int.template = [3]
+ split.num = 5
+ split.read-interval = 100
+ result_table_name = "mongodb_table"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017"
+ database = "test_db"
+ collection = "test_source_transaction_sink_table"
+ transaction = true
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf
new file mode 100644
index 000000000..53a98fe28
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/fake_source_to_transaction_upsert_mongodb.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 50
+ int.template = [2]
+ split.num = 5
+ split.read-interval = 100
+ result_table_name = "mongodb_table"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017"
+ database = "test_db"
+ collection = "test_source_upsert_transaction_table"
+ transaction = true
+ upsert-enable = true
+ primary-key = ["c_int"]
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf
new file mode 100644
index 000000000..f453ff5df
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_sink_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db"
+ database = "test_db"
+ collection = "test_source_transaction_sink_table"
+ cursor.no-timeout = true
+ result_table_name = "mongodb_table"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "mongodb_table"
+ }
+ Assert {
+ source_table_name = "mongodb_table"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 50
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 50
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf
new file mode 100644
index 000000000..0a5f8e5e1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/transactionIT/mongodb_source_transaction_upsert_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db"
+ database = "test_db"
+ collection = "test_source_upsert_transaction_table"
+ cursor.no-timeout = true
+ result_table_name = "mongodb_table"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_int = int
+ c_bigint = bigint
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(33, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "mongodb_table"
+ }
+ Assert {
+ source_table_name = "mongodb_table"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}