This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cb651cd7f3 [Feature][connector-v2][mongodb] mongodb support cdc sink
(#4833)
cb651cd7f3 is described below
commit cb651cd7f35bd81e9c721a03f8376ca10b39c1f4
Author: monster <[email protected]>
AuthorDate: Mon Jun 26 16:27:26 2023 +0800
[Feature][connector-v2][mongodb] mongodb support cdc sink (#4833)
---
docs/en/connector-v2/sink/MongoDB.md | 49 +++--
release-note.md | 1 +
.../seatunnel/mongodb/config/MongodbConfig.java | 7 +-
.../mongodb/serde/RowDataDocumentSerializer.java | 73 ++++++-
.../seatunnel/mongodb/sink/MongoKeyExtractor.java | 6 +-
.../seatunnel/mongodb/sink/MongodbSink.java | 15 +-
.../seatunnel/mongodb/sink/MongodbSinkFactory.java | 2 +-
.../seatunnel/mongodb/sink/MongodbWriter.java | 11 +-
.../mongodb/sink/MongodbWriterOptions.java | 14 +-
.../connector-mongodb-e2e/pom.xml | 7 +-
.../{MongodbIT.java => AbstractMongodbIT.java} | 235 ++++++---------------
.../e2e/connector/v2/mongodb/MongodbCDCIT.java | 90 ++++++++
.../e2e/connector/v2/mongodb/MongodbIT.java | 203 +-----------------
.../resources/cdcIT/fake_cdc_sink_mongodb.conf | 75 +++++++
.../cdcIT/fake_cdc_upsert_sink_mongodb.conf | 88 ++++++++
.../src/test/resources/fake_source_to_mongodb.conf | 1 -
.../fake_source_to_updateMode_insert_mongodb.conf | 2 +-
.../updateIT/fake_source_to_update_mongodb.conf | 2 +-
18 files changed, 449 insertions(+), 432 deletions(-)
diff --git a/docs/en/connector-v2/sink/MongoDB.md
b/docs/en/connector-v2/sink/MongoDB.md
index 21ce2d228c..41cd463c63 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -13,7 +13,11 @@ Key Features
------------
- [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [cdc](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+**Tips**
+
+> 1.If you want to use CDC-written features, recommend enable the
upsert-enable configuration.
Description
-----------
@@ -34,9 +38,9 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
Data Type Mapping
-----------------
-The following table lists the field data type mapping from MongoDB BSON type
to SeaTunnel data type.
+The following table lists the field data type mapping from MongoDB BSON type
to Seatunnel data type.
-| SeaTunnel Data Type | MongoDB BSON Type |
+| Seatunnel Data Type | MongoDB BSON Type |
|---------------------|-------------------|
| STRING | ObjectId |
| STRING | String |
@@ -62,23 +66,24 @@ The following table lists the field data type mapping from
MongoDB BSON type to
Sink Options
------------
-| Name | Type | Required | Default |
Description
|
-|-----------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------|
-| uri | String | Yes | - | The MongoDB
connection uri.
|
-| database | String | Yes | - | The name of MongoDB
database to read or write.
|
-| collection | String | Yes | - | The name of MongoDB
collection to read or write.
|
-| schema | String | Yes | - | MongoDB's BSON and
seatunnel data structure mapping
|
-| buffer-flush.max-rows | String | No | 1000 | Specifies the
maximum number of buffered rows per batch request.
|
-| buffer-flush.interval | String | No | 30000 | Specifies the retry
time interval if writing records to database failed, the unit is seconds.
|
-| retry.max | String | No | 3 | Specifies the max
retry times if writing records to database failed.
|
-| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond.
|
-| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
-| upsert-key | List | No | - | The primary keys for
upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for
properties. |
+| Name | Type | Required | Default |
Description |
+|-----------------------|----------|----------|---------|---------------------------------------------------------------------------------------------------|
+| uri | String | Yes | - | The MongoDB
connection uri.
|
+| database | String | Yes | - | The name of MongoDB
database to read or write. |
+| collection | String | Yes | - | The name of MongoDB
collection to read or write. |
+| schema | String | Yes | - | MongoDB's BSON and
seatunnel data structure mapping |
+| buffer-flush.max-rows | String | No | 1000 | Specifies the
maximum number of buffered rows per batch request.
|
+| buffer-flush.interval | String | No | 30000 | Specifies the retry
time interval if writing records to database failed, the unit is seconds. |
+| retry.max | String | No | 3 | Specifies the max
retry times if writing records to database failed.
|
+| retry.interval | Duration | No | 1000 | Specifies the retry
time interval if writing records to database failed, the unit is millisecond. |
+| upsert-enable | Boolean | No | false | Whether to write
documents via upsert mode.
|
+| primary-key | List | No | - | The primary keys for
upsert/update. Keys are in `["id","name",...]` format for properties. |
**Tips**
> 1.The data flushing logic of the MongoDB Sink Connector is jointly
> controlled by three parameters: `buffer-flush.max-rows`,
> `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.<br/>
+> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is
set, please do not set `primary-key`.<br/>
How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------
@@ -198,8 +203,8 @@ The necessity for using transactions can be greatly avoided
by designing systems
By specifying a clear primary key and using the upsert method, exactly-once
write semantics can be achieved.
-If upsert-key is defined in the configuration, the MongoDB sink will use
upsert semantics instead of regular INSERT statements. We combine the primary
keys declared in upsert-key as the MongoDB reserved primary key and use upsert
mode for writing to ensure idempotent writes.
-In the event of a failure, SeaTunnel jobs will recover from the last
successful checkpoint and reprocess, which may result in duplicate message
processing during recovery. It is highly recommended to use upsert mode, as it
helps to avoid violating database primary key constraints and generating
duplicate data if records need to be reprocessed.
+If `primary-key` and `upsert-enable` is defined in the configuration, the
MongoDB sink will use upsert semantics instead of regular INSERT statements. We
combine the primary keys declared in upsert-key as the MongoDB reserved primary
key and use upsert mode for writing to ensure idempotent writes.
+In the event of a failure, Seatunnel jobs will recover from the last
successful checkpoint and reprocess, which may result in duplicate message
processing during recovery. It is highly recommended to use upsert mode, as it
helps to avoid violating database primary key constraints and generating
duplicate data if records need to be reprocessed.
```bash
sink {
@@ -208,7 +213,7 @@ sink {
database = "test_db"
collection = "users"
upsert-enable = true
- upsert-key = ["name","status"]
+ primary-key = ["name","status"]
schema = {
fields {
_id = string
@@ -222,11 +227,15 @@ sink {
## Changelog
-### 2.2.0-beta 2022-09-26
+### 2.2.0-beta
- Add MongoDB Source Connector
+### 2.3.1-release
+
+- [Feature]Refactor mongodb source
connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))
+
### Next Version
-- [Feature]Refactor mongodb source
connector([4620](https://github.com/apache/seatunnel/pull/4620))
+- [Feature]Mongodb support cdc
sink([4833](https://github.com/apache/seatunnel/pull/4833))
diff --git a/release-note.md b/release-note.md
index 80f1479afd..1c09b9af26 100644
--- a/release-note.md
+++ b/release-note.md
@@ -21,6 +21,7 @@
- [Connector-V2] [Kafka] Fix KafkaProducer resources have never been released.
(#4302)
- [Connector-V2] [Kafka] Fix the permission problem caused by client.id.
(#4246)
- [Connector-V2] [Kafka] Fix KafkaConsumerThread exit caused by commit offset
error. (#4379)
+- [Connector-V2] [Mongodb] Mongodb support cdc sink. (#4833)
- [Connector-V2] [kafka] Fix the problem that the partition information can
not be obtained when kafka is restored (#4764)
- [Connector-V2] [SFTP] Fix incorrect exception handling logic (#4720)
- [Connector-V2] [File] Fix read temp file (#4876)
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index 0efe88ad0f..6d008d5bd3 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -142,10 +142,11 @@ public class MongodbConfig {
.defaultValue(false)
.withDescription("Whether to write documents via upsert
mode.");
- public static final Option<List<String>> UPSERT_KEY =
- Options.key("upsert-key")
+ public static final Option<List<String>> PRIMARY_KEY =
+ Options.key("primary-key")
.listType()
.noDefaultValue()
.withDescription(
- "The primary keys for upsert. Only valid in upsert
mode. Keys are in csv format for properties.");
+ "The primary keys for upsert/update. Keys are in
csv format for properties.")
+ .withFallbackKeys("upsert-key");
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataDocumentSerializer.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataDocumentSerializer.java
index 6ba99fec5d..2e320e4f45 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataDocumentSerializer.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataDocumentSerializer.java
@@ -17,30 +17,37 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
+import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT;
+
public class RowDataDocumentSerializer implements
DocumentSerializer<SeaTunnelRow> {
private final RowDataToBsonConverters.RowDataToBsonConverter
rowDataToBsonConverter;
-
private final Boolean isUpsertEnable;
-
private final Function<BsonDocument, BsonDocument> filterConditions;
+ private final Map<RowKind, WriteModelSupplier> writeModelSuppliers;
+
public RowDataDocumentSerializer(
RowDataToBsonConverters.RowDataToBsonConverter
rowDataToBsonConverter,
MongodbWriterOptions options,
@@ -48,19 +55,59 @@ public class RowDataDocumentSerializer implements
DocumentSerializer<SeaTunnelRo
this.rowDataToBsonConverter = rowDataToBsonConverter;
this.isUpsertEnable = options.isUpsertEnable();
this.filterConditions = filterConditions;
+
+ writeModelSuppliers = createWriteModelSuppliers();
}
- @Override
public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
- final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
- if (isUpsertEnable) {
- Bson filter = generateFilter(filterConditions.apply(bsonDocument));
- bsonDocument.remove("_id");
- BsonDocument update = new BsonDocument("$set", bsonDocument);
- return new UpdateOneModel<>(filter, update, new
UpdateOptions().upsert(true));
- } else {
- return new InsertOneModel<>(bsonDocument);
+ WriteModelSupplier writeModelSupplier =
writeModelSuppliers.get(row.getRowKind());
+ if (writeModelSupplier == null) {
+ throw new MongodbConnectorException(
+ ILLEGAL_ARGUMENT, "Unsupported message kind: " +
row.getRowKind());
}
+ return writeModelSupplier.get(row);
+ }
+
+ private Map<RowKind, WriteModelSupplier> createWriteModelSuppliers() {
+ Map<RowKind, WriteModelSupplier> writeModelSuppliers = new HashMap<>();
+
+ WriteModelSupplier upsertSupplier =
+ row -> {
+ final BsonDocument bsonDocument =
rowDataToBsonConverter.convert(row);
+ Bson filter =
generateFilter(filterConditions.apply(bsonDocument));
+ bsonDocument.remove("_id");
+ BsonDocument update = new BsonDocument("$set",
bsonDocument);
+ return new UpdateOneModel<>(filter, update, new
UpdateOptions().upsert(true));
+ };
+
+ WriteModelSupplier updateSupplier =
+ row -> {
+ final BsonDocument bsonDocument =
rowDataToBsonConverter.convert(row);
+ Bson filter =
generateFilter(filterConditions.apply(bsonDocument));
+ bsonDocument.remove("_id");
+ BsonDocument update = new BsonDocument("$set",
bsonDocument);
+ return new UpdateOneModel<>(filter, update);
+ };
+
+ WriteModelSupplier insertSupplier =
+ row -> {
+ final BsonDocument bsonDocument =
rowDataToBsonConverter.convert(row);
+ return new InsertOneModel<>(bsonDocument);
+ };
+
+ WriteModelSupplier deleteSupplier =
+ row -> {
+ final BsonDocument bsonDocument =
rowDataToBsonConverter.convert(row);
+ Bson filter =
generateFilter(filterConditions.apply(bsonDocument));
+ return new DeleteOneModel<>(filter);
+ };
+
+ writeModelSuppliers.put(RowKind.INSERT, isUpsertEnable ?
upsertSupplier : insertSupplier);
+ writeModelSuppliers.put(
+ RowKind.UPDATE_AFTER, isUpsertEnable ? upsertSupplier :
updateSupplier);
+ writeModelSuppliers.put(RowKind.DELETE, deleteSupplier);
+
+ return writeModelSuppliers;
}
public static Bson generateFilter(BsonDocument filterConditions) {
@@ -71,4 +118,8 @@ public class RowDataDocumentSerializer implements
DocumentSerializer<SeaTunnelRo
return Filters.and(filters);
}
+
+ private interface WriteModelSupplier {
+ WriteModel<BsonDocument> get(SeaTunnelRow row);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongoKeyExtractor.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongoKeyExtractor.java
index b258c45d65..fe3aff6ae1 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongoKeyExtractor.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongoKeyExtractor.java
@@ -28,15 +28,15 @@ public class MongoKeyExtractor implements
SerializableFunction<BsonDocument, Bso
private static final long serialVersionUID = 1L;
- private final String[] upsertKey;
+ private final String[] primaryKey;
public MongoKeyExtractor(MongodbWriterOptions options) {
- upsertKey = options.getUpsertKey();
+ primaryKey = options.getPrimaryKey();
}
@Override
public BsonDocument apply(BsonDocument bsonDocument) {
- return Arrays.stream(upsertKey)
+ return Arrays.stream(primaryKey)
.filter(bsonDocument::containsKey)
.collect(
Collectors.toMap(
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 47d39a7f70..f0c6a4c746 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConv
import com.google.auto.service.AutoService;
import java.io.IOException;
+import java.util.List;
import static
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
@@ -65,12 +66,20 @@ public class MongodbSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
builder.withBatchIntervalMs(
pluginConfig.getLong(MongodbConfig.BUFFER_FLUSH_INTERVAL.key()));
}
- if (pluginConfig.hasPath(MongodbConfig.UPSERT_KEY.key())) {
- builder.withUpsertKey(
+ if (pluginConfig.hasPath(MongodbConfig.PRIMARY_KEY.key())) {
+ builder.withPrimaryKey(
pluginConfig
- .getStringList(MongodbConfig.UPSERT_KEY.key())
+ .getStringList(MongodbConfig.PRIMARY_KEY.key())
.toArray(new String[0]));
}
+ List<String> fallbackKeys =
MongodbConfig.PRIMARY_KEY.getFallbackKeys();
+ fallbackKeys.forEach(
+ key -> {
+ if (pluginConfig.hasPath(key)) {
+ builder.withPrimaryKey(
+
pluginConfig.getStringList(key).toArray(new String[0]));
+ }
+ });
if (pluginConfig.hasPath(MongodbConfig.UPSERT_ENABLE.key())) {
builder.withUpsertEnable(
pluginConfig.getBoolean(MongodbConfig.UPSERT_ENABLE.key()));
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
index 2b84d672b9..50bb48e98a 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -48,7 +48,7 @@ public class MongodbSinkFactory implements TableSinkFactory {
MongodbConfig.RETRY_MAX,
MongodbConfig.RETRY_INTERVAL,
MongodbConfig.UPSERT_ENABLE,
- MongodbConfig.UPSERT_KEY)
+ MongodbConfig.PRIMARY_KEY)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
index 299c6c4a63..0361ad0606 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
@@ -86,10 +87,12 @@ public class MongodbWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
}
@Override
- public void write(SeaTunnelRow o) throws IOException {
- bulkRequests.add(serializer.serializeToWriteModel(o));
- if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
- doBulkWrite();
+ public void write(SeaTunnelRow o) {
+ if (o.getRowKind() != RowKind.UPDATE_BEFORE) {
+ bulkRequests.add(serializer.serializeToWriteModel(o));
+ if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
+ doBulkWrite();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
index e37a4d22cd..be8becd327 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriterOptions.java
@@ -38,7 +38,7 @@ public class MongodbWriterOptions implements Serializable {
protected final boolean upsertEnable;
- protected final String[] upsertKey;
+ protected final String[] primaryKey;
protected final int retryMax;
@@ -51,7 +51,7 @@ public class MongodbWriterOptions implements Serializable {
int flushSize,
Long batchIntervalMs,
boolean upsertEnable,
- String[] upsertKey,
+ String[] primaryKey,
int retryMax,
Long retryInterval) {
this.connectString = connectString;
@@ -60,7 +60,7 @@ public class MongodbWriterOptions implements Serializable {
this.flushSize = flushSize;
this.batchIntervalMs = batchIntervalMs;
this.upsertEnable = upsertEnable;
- this.upsertKey = upsertKey;
+ this.primaryKey = primaryKey;
this.retryMax = retryMax;
this.retryInterval = retryInterval;
}
@@ -83,7 +83,7 @@ public class MongodbWriterOptions implements Serializable {
protected boolean upsertEnable;
- protected String[] upsertKey;
+ protected String[] primaryKey;
protected int retryMax;
@@ -119,8 +119,8 @@ public class MongodbWriterOptions implements Serializable {
return this;
}
- public Builder withUpsertKey(String[] upsertKey) {
- this.upsertKey = upsertKey;
+ public Builder withPrimaryKey(String[] primaryKey) {
+ this.primaryKey = primaryKey;
return this;
}
@@ -142,7 +142,7 @@ public class MongodbWriterOptions implements Serializable {
flushSize,
batchIntervalMs,
upsertEnable,
- upsertKey,
+ primaryKey,
retryMax,
retryInterval);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
index 3f979a8f77..96c097c7fa 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
@@ -26,7 +26,6 @@
<name>SeaTunnel : E2E : Connector V2 : Mongodb</name>
<dependencies>
- <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-mongodb</artifactId>
@@ -39,5 +38,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
similarity index 52%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index 245b709e34..4c85c0d097 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -19,14 +19,11 @@ package org.apache.seatunnel.e2e.connector.v2.mongodb;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.awaitility.Awaitility;
import org.bson.Document;
-import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
@@ -41,163 +38,53 @@ import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Sorts;
import lombok.extern.slf4j.Slf4j;
-import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
@Slf4j
-public class MongodbIT extends TestSuiteBase implements TestResource {
+public abstract class AbstractMongodbIT extends TestSuiteBase implements
TestResource {
- private static final Random RANDOM = new Random();
+ protected static final Random RANDOM = new Random();
- private static final List<Document> TEST_MATCH_DATASET =
generateTestDataSet(5);
+ protected static final List<Document> TEST_MATCH_DATASET =
generateTestDataSet(5);
- private static final List<Document> TEST_SPLIT_DATASET =
generateTestDataSet(10);
+ protected static final List<Document> TEST_SPLIT_DATASET =
generateTestDataSet(10);
- private static final String MONGODB_IMAGE = "mongo:latest";
+ protected static final String MONGODB_IMAGE = "mongo:latest";
- private static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
+ protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
- private static final int MONGODB_PORT = 27017;
+ protected static final int MONGODB_PORT = 27017;
- private static final String MONGODB_DATABASE = "test_db";
+ protected static final String MONGODB_DATABASE = "test_db";
- private static final String MONGODB_MATCH_TABLE = "test_match_op_db";
+ protected static final String MONGODB_MATCH_TABLE = "test_match_op_db";
- private static final String MONGODB_SPLIT_TABLE = "test_split_op_db";
+ protected static final String MONGODB_SPLIT_TABLE = "test_split_op_db";
- private static final String MONGODB_MATCH_RESULT_TABLE =
"test_match_op_result_db";
+ protected static final String MONGODB_MATCH_RESULT_TABLE =
"test_match_op_result_db";
- private static final String MONGODB_SPLIT_RESULT_TABLE =
"test_split_op_result_db";
+ protected static final String MONGODB_SPLIT_RESULT_TABLE =
"test_split_op_result_db";
- private static final String MONGODB_SINK_TABLE = "test_source_sink_table";
+ protected static final String MONGODB_SINK_TABLE =
"test_source_sink_table";
- private static final String MONGODB_UPDATE_TABLE = "test_update_table";
+ protected static final String MONGODB_UPDATE_TABLE = "test_update_table";
- private static final String MONGODB_FLAT_TABLE = "test_flat_table";
+ protected static final String MONGODB_FLAT_TABLE = "test_flat_table";
- private GenericContainer<?> mongodbContainer;
+ protected static final String MONGODB_CDC_RESULT_TABLE = "test_cdc_table";
- private MongoClient client;
+ protected GenericContainer<?> mongodbContainer;
- @TestTemplate
- public void testMongodbSourceAndSink(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult insertResult =
container.executeJob("/fake_source_to_mongodb.conf");
- Assertions.assertEquals(0, insertResult.getExitCode(),
insertResult.getStderr());
-
- Container.ExecResult assertResult =
container.executeJob("/mongodb_source_to_assert.conf");
- Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
- clearDate(MONGODB_SINK_TABLE);
- }
-
- @TestTemplate
- public void testMongodbSourceMatch(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult queryResult =
-
container.executeJob("/matchIT/mongodb_matchQuery_source_to_assert.conf");
- Assertions.assertEquals(0, queryResult.getExitCode(),
queryResult.getStderr());
-
- Assertions.assertIterableEquals(
- TEST_MATCH_DATASET.stream()
- .filter(x -> x.get("c_int").equals(2))
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()),
- readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()));
- clearDate(MONGODB_MATCH_RESULT_TABLE);
-
- Container.ExecResult projectionResult =
-
container.executeJob("/matchIT/mongodb_matchProjection_source_to_assert.conf");
- Assertions.assertEquals(0, projectionResult.getExitCode(),
projectionResult.getStderr());
-
- Assertions.assertIterableEquals(
- TEST_MATCH_DATASET.stream()
- .map(Document::new)
- .peek(document -> document.remove("c_bigint"))
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()),
- readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()));
- clearDate(MONGODB_MATCH_RESULT_TABLE);
- }
-
- @TestTemplate
- public void testFakeSourceToUpdateMongodb(TestContainer container)
- throws IOException, InterruptedException {
-
- Container.ExecResult insertResult =
-
container.executeJob("/updateIT/fake_source_to_updateMode_insert_mongodb.conf");
- Assertions.assertEquals(0, insertResult.getExitCode(),
insertResult.getStderr());
-
- Container.ExecResult updateResult =
-
container.executeJob("/updateIT/fake_source_to_update_mongodb.conf");
- Assertions.assertEquals(0, updateResult.getExitCode(),
updateResult.getStderr());
-
- Container.ExecResult assertResult =
-
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
- Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
-
- clearDate(MONGODB_UPDATE_TABLE);
- }
-
- @TestTemplate
- public void testFlatSyncString(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult insertResult =
-
container.executeJob("/flatIT/fake_source_to_flat_mongodb.conf");
- Assertions.assertEquals(0, insertResult.getExitCode(),
insertResult.getStderr());
-
- Container.ExecResult assertResult =
-
container.executeJob("/flatIT/mongodb_flat_source_to_assert.conf");
- Assertions.assertEquals(0, assertResult.getExitCode(),
assertResult.getStderr());
-
- clearDate(MONGODB_FLAT_TABLE);
- }
-
- @TestTemplate
- public void testMongodbSourceSplit(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult queryResult =
-
container.executeJob("/splitIT/mongodb_split_key_source_to_assert.conf");
- Assertions.assertEquals(0, queryResult.getExitCode(),
queryResult.getStderr());
-
- Assertions.assertIterableEquals(
- TEST_SPLIT_DATASET.stream()
- .map(Document::new)
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()),
- readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream()
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()));
- clearDate(MONGODB_SPLIT_RESULT_TABLE);
-
- Container.ExecResult projectionResult =
-
container.executeJob("/splitIT/mongodb_split_size_source_to_assert.conf");
- Assertions.assertEquals(0, projectionResult.getExitCode(),
projectionResult.getStderr());
-
- Assertions.assertIterableEquals(
- TEST_SPLIT_DATASET.stream()
- .map(Document::new)
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()),
- readMongodbData(MONGODB_SPLIT_RESULT_TABLE).stream()
- .peek(e -> e.remove("_id"))
- .collect(Collectors.toList()));
- clearDate(MONGODB_SPLIT_RESULT_TABLE);
- }
+ protected MongoClient client;
public void initConnection() {
String host = mongodbContainer.getContainerIpAddress();
@@ -206,58 +93,24 @@ public class MongodbIT extends TestSuiteBase implements
TestResource {
client = MongoClients.create(url);
}
- private void initSourceData() {
+ protected void initSourceData() {
MongoCollection<Document> sourceMatchTable =
- client.getDatabase(MongodbIT.MONGODB_DATABASE)
- .getCollection(MongodbIT.MONGODB_MATCH_TABLE);
+
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);
sourceMatchTable.deleteMany(new Document());
- sourceMatchTable.insertMany(MongodbIT.TEST_MATCH_DATASET);
+ sourceMatchTable.insertMany(TEST_MATCH_DATASET);
MongoCollection<Document> sourceSplitTable =
- client.getDatabase(MongodbIT.MONGODB_DATABASE)
- .getCollection(MongodbIT.MONGODB_SPLIT_TABLE);
+
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);
sourceSplitTable.deleteMany(new Document());
- sourceSplitTable.insertMany(MongodbIT.TEST_SPLIT_DATASET);
+ sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
}
- private void clearDate(String table) {
+ protected void clearDate(String table) {
client.getDatabase(MONGODB_DATABASE).getCollection(table).drop();
}
- @BeforeAll
- @Override
- public void startUp() {
- DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
- mongodbContainer =
- new GenericContainer<>(imageName)
- .withNetwork(NETWORK)
- .withNetworkAliases(MONGODB_CONTAINER_HOST)
- .withExposedPorts(MONGODB_PORT)
- .waitingFor(
- new HttpWaitStrategy()
- .forPort(MONGODB_PORT)
- .forStatusCodeMatching(
- response ->
- response == HTTP_OK
- || response ==
HTTP_UNAUTHORIZED)
-
.withStartupTimeout(Duration.ofMinutes(2)))
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
-
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
- Startables.deepStart(Stream.of(mongodbContainer)).join();
- log.info("Mongodb container started");
-
- Awaitility.given()
- .ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initConnection);
- this.initSourceData();
- }
-
public static List<Document> generateTestDataSet(int count) {
List<Document> dataSet = new ArrayList<>();
@@ -311,7 +164,7 @@ public class MongodbIT extends TestSuiteBase implements
TestResource {
return dataSet;
}
- private static String randomString() {
+ protected static String randomString() {
int length = RANDOM.nextInt(10) + 1;
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
@@ -321,7 +174,7 @@ public class MongodbIT extends TestSuiteBase implements
TestResource {
return sb.toString();
}
- private List<Document> readMongodbData(String collection) {
+ protected List<Document> readMongodbData(String collection) {
MongoCollection<Document> sinkTable =
client.getDatabase(MONGODB_DATABASE).getCollection(collection);
MongoCursor<Document> cursor =
sinkTable.find().sort(Sorts.ascending("c_int")).cursor();
@@ -332,6 +185,40 @@ public class MongodbIT extends TestSuiteBase implements
TestResource {
return documents;
}
+ @BeforeAll
+ @Override
+ public void startUp() {
+ DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+ mongodbContainer =
+ new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MONGODB_CONTAINER_HOST)
+ .withExposedPorts(MONGODB_PORT)
+ .waitingFor(
+ new HttpWaitStrategy()
+ .forPort(MONGODB_PORT)
+ .forStatusCodeMatching(
+ response ->
+ response == HTTP_OK
+ || response ==
HTTP_UNAUTHORIZED)
+
.withStartupTimeout(Duration.ofMinutes(2)))
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
+ // For local test use
+ //
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
+ Startables.deepStart(Stream.of(mongodbContainer)).join();
+ log.info("Mongodb container started");
+
+ Awaitility.given()
+ .ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+ this.initSourceData();
+ }
+
+ @AfterAll
@Override
public void tearDown() {
if (client != null) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
new file mode 100644
index 0000000000..6db7db4fa1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbCDCIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.v2.mongodb;
+
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.bson.Document;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Spark engine will lose the row kind of record")
+@Slf4j
+public class MongodbCDCIT extends AbstractMongodbIT {
+
+ @TestTemplate
+ public void testMongodbCDCUpsertSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult queryResult =
+
container.executeJob("/cdcIT/fake_cdc_upsert_sink_mongodb.conf");
+ Assertions.assertEquals(0, queryResult.getExitCode(),
queryResult.getStderr());
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100),
Arrays.asList(3L, "C", 100))
+ .collect(Collectors.toList()),
+ readMongodbData(MONGODB_CDC_RESULT_TABLE).stream()
+ .peek(e -> e.remove("_id"))
+ .map(Document::entrySet)
+ .map(Set::stream)
+ .map(
+ entryStream ->
+ entryStream
+ .map(Map.Entry::getValue)
+
.collect(Collectors.toCollection(ArrayList::new)))
+ .collect(Collectors.toList()));
+ clearDate(MONGODB_CDC_RESULT_TABLE);
+ }
+
+ @TestTemplate
+ public void testMongodbCDCSink(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult queryResult =
+ container.executeJob("/cdcIT/fake_cdc_sink_mongodb.conf");
+ Assertions.assertEquals(0, queryResult.getExitCode(),
queryResult.getStderr());
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100),
Arrays.asList(3L, "C", 100))
+ .collect(Collectors.toList()),
+ readMongodbData(MONGODB_CDC_RESULT_TABLE).stream()
+ .peek(e -> e.remove("_id"))
+ .map(Document::entrySet)
+ .map(Set::stream)
+ .map(
+ entryStream ->
+ entryStream
+ .map(Map.Entry::getValue)
+
.collect(Collectors.toCollection(ArrayList::new)))
+ .collect(Collectors.toList()));
+ clearDate(MONGODB_CDC_RESULT_TABLE);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 245b709e34..ce25b2062b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -17,78 +17,20 @@
package org.apache.seatunnel.e2e.connector.v2.mongodb;
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
@Slf4j
-public class MongodbIT extends TestSuiteBase implements TestResource {
-
- private static final Random RANDOM = new Random();
-
- private static final List<Document> TEST_MATCH_DATASET =
generateTestDataSet(5);
-
- private static final List<Document> TEST_SPLIT_DATASET =
generateTestDataSet(10);
-
- private static final String MONGODB_IMAGE = "mongo:latest";
-
- private static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
-
- private static final int MONGODB_PORT = 27017;
-
- private static final String MONGODB_DATABASE = "test_db";
-
- private static final String MONGODB_MATCH_TABLE = "test_match_op_db";
-
- private static final String MONGODB_SPLIT_TABLE = "test_split_op_db";
-
- private static final String MONGODB_MATCH_RESULT_TABLE =
"test_match_op_result_db";
-
- private static final String MONGODB_SPLIT_RESULT_TABLE =
"test_split_op_result_db";
-
- private static final String MONGODB_SINK_TABLE = "test_source_sink_table";
-
- private static final String MONGODB_UPDATE_TABLE = "test_update_table";
-
- private static final String MONGODB_FLAT_TABLE = "test_flat_table";
-
- private GenericContainer<?> mongodbContainer;
-
- private MongoClient client;
+public class MongodbIT extends AbstractMongodbIT {
@TestTemplate
public void testMongodbSourceAndSink(TestContainer container)
@@ -198,147 +140,4 @@ public class MongodbIT extends TestSuiteBase implements
TestResource {
.collect(Collectors.toList()));
clearDate(MONGODB_SPLIT_RESULT_TABLE);
}
-
- public void initConnection() {
- String host = mongodbContainer.getContainerIpAddress();
- int port = mongodbContainer.getFirstMappedPort();
- String url = String.format("mongodb://%s:%d/%s", host, port,
MONGODB_DATABASE);
- client = MongoClients.create(url);
- }
-
- private void initSourceData() {
- MongoCollection<Document> sourceMatchTable =
- client.getDatabase(MongodbIT.MONGODB_DATABASE)
- .getCollection(MongodbIT.MONGODB_MATCH_TABLE);
-
- sourceMatchTable.deleteMany(new Document());
- sourceMatchTable.insertMany(MongodbIT.TEST_MATCH_DATASET);
-
- MongoCollection<Document> sourceSplitTable =
- client.getDatabase(MongodbIT.MONGODB_DATABASE)
- .getCollection(MongodbIT.MONGODB_SPLIT_TABLE);
-
- sourceSplitTable.deleteMany(new Document());
- sourceSplitTable.insertMany(MongodbIT.TEST_SPLIT_DATASET);
- }
-
- private void clearDate(String table) {
- client.getDatabase(MONGODB_DATABASE).getCollection(table).drop();
- }
-
- @BeforeAll
- @Override
- public void startUp() {
- DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
- mongodbContainer =
- new GenericContainer<>(imageName)
- .withNetwork(NETWORK)
- .withNetworkAliases(MONGODB_CONTAINER_HOST)
- .withExposedPorts(MONGODB_PORT)
- .waitingFor(
- new HttpWaitStrategy()
- .forPort(MONGODB_PORT)
- .forStatusCodeMatching(
- response ->
- response == HTTP_OK
- || response ==
HTTP_UNAUTHORIZED)
-
.withStartupTimeout(Duration.ofMinutes(2)))
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
-
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
- Startables.deepStart(Stream.of(mongodbContainer)).join();
- log.info("Mongodb container started");
-
- Awaitility.given()
- .ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initConnection);
- this.initSourceData();
- }
-
- public static List<Document> generateTestDataSet(int count) {
- List<Document> dataSet = new ArrayList<>();
-
- for (int i = 0; i < count; i++) {
- dataSet.add(
- new Document(
- "c_map",
- new Document("OQBqH", randomString())
- .append("rkvlO", randomString())
- .append("pCMEX", randomString())
- .append("DAgdj", randomString())
- .append("dsJag", randomString()))
- .append(
- "c_array",
- Arrays.asList(
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt()))
- .append("c_string", randomString())
- .append("c_boolean", RANDOM.nextBoolean())
- .append("c_int", i)
- .append("c_bigint", RANDOM.nextLong())
- .append("c_double", RANDOM.nextDouble() *
Double.MAX_VALUE)
- .append(
- "c_row",
- new Document(
- "c_map",
- new Document("OQBqH",
randomString())
- .append("rkvlO",
randomString())
- .append("pCMEX",
randomString())
- .append("DAgdj",
randomString())
- .append("dsJag",
randomString()))
- .append(
- "c_array",
- Arrays.asList(
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt(),
- RANDOM.nextInt()))
- .append("c_string", randomString())
- .append("c_boolean",
RANDOM.nextBoolean())
- .append("c_int", RANDOM.nextInt())
- .append("c_bigint",
RANDOM.nextLong())
- .append(
- "c_double",
- RANDOM.nextDouble() *
Double.MAX_VALUE)));
- }
- return dataSet;
- }
-
- private static String randomString() {
- int length = RANDOM.nextInt(10) + 1;
- StringBuilder sb = new StringBuilder(length);
- for (int i = 0; i < length; i++) {
- char c = (char) (RANDOM.nextInt(26) + 'a');
- sb.append(c);
- }
- return sb.toString();
- }
-
- private List<Document> readMongodbData(String collection) {
- MongoCollection<Document> sinkTable =
- client.getDatabase(MONGODB_DATABASE).getCollection(collection);
- MongoCursor<Document> cursor =
sinkTable.find().sort(Sorts.ascending("c_int")).cursor();
- List<Document> documents = new ArrayList<>();
- while (cursor.hasNext()) {
- documents.add(cursor.next());
- }
- return documents;
- }
-
- @Override
- public void tearDown() {
- if (client != null) {
- client.close();
- }
- if (mongodbContainer != null) {
- mongodbContainer.close();
- }
- }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/cdcIT/fake_cdc_sink_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/cdcIT/fake_cdc_sink_mongodb.conf
new file mode 100644
index 0000000000..dd13e555cc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/cdcIT/fake_cdc_sink_mongodb.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_int = bigint
+ name = string
+ score = int
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+ database = "test_db"
+ collection = "test_cdc_table"
+ primary-key = ["c_int"]
+ schema = {
+ fields {
+ c_int = int
+ name = string
+ score = int
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/cdcIT/fake_cdc_upsert_sink_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/cdcIT/fake_cdc_upsert_sink_mongodb.conf
new file mode 100644
index 0000000000..d072f8548e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/cdcIT/fake_cdc_upsert_sink_mongodb.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_int = bigint
+ name = string
+ score = int
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ MongoDB {
+ uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+ database = "test_db"
+ collection = "test_cdc_table"
+ upsert-enable = true
+ primary-key = ["c_int"]
+ schema = {
+ fields {
+ c_int = int
+ name = string
+ score = int
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb.conf
index 3ea58f960c..c44c2add1c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/fake_source_to_mongodb.conf
@@ -67,7 +67,6 @@ sink {
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
database = "test_db"
collection = "test_source_sink_table"
- transaction-enable = true
schema = {
fields {
c_map = "map<string, string>"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_updateMode_insert_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_updateMode_insert_mongodb.conf
index 0ecc9dfb36..527de9bc73 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_updateMode_insert_mongodb.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_updateMode_insert_mongodb.conf
@@ -68,7 +68,7 @@ sink {
database = "test_db"
collection = "test_update_table"
upsert-enable = true
- upsert-key = ["c_string"]
+ primary-key = ["c_string"]
source_table_name = "mongodb_table"
schema = {
fields {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_update_mongodb.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_update_mongodb.conf
index c210e9eabf..d34da66def 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_update_mongodb.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/updateIT/fake_source_to_update_mongodb.conf
@@ -68,7 +68,7 @@ sink {
database = "test_db"
collection = "test_update_table"
upsert-enable = true
- upsert-key = ["c_int"]
+ primary-key = ["c_int"]
source_table_name = "mongodb_table"
schema = {
fields {