This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8c569b1541 [Feature][CDC] Support MongoDB CDC running on flink (#5644)
8c569b1541 is described below
commit 8c569b1541e2c2358cfb3357b8814bfc0415b6ec
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Oct 17 18:25:39 2023 +0800
[Feature][CDC] Support MongoDB CDC running on flink (#5644)
---------
Co-authored-by: zhouyao <[email protected]>
---
docs/en/connector-v2/source/MongoDB-CDC.md | 1 +
.../MongoDBConnectorDeserializationSchema.java | 11 ++------
.../src/test/java/mongodb/MongodbCDCIT.java | 30 ++++++++++++++++++++--
.../src/test/resources/ddl/inventoryClean.js | 16 ++++++++++++
4 files changed, 47 insertions(+), 11 deletions(-)
diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md
b/docs/en/connector-v2/source/MongoDB-CDC.md
index d78f70110f..17fe09e3b2 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -5,6 +5,7 @@
## Support Those Engines
> SeaTunnel Zeta<br/>
+> Flink<br/>
## Key Features
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 6f36f4be83..4df666d2ad 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -64,7 +64,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -101,11 +100,7 @@ public class MongoDBConnectorDeserializationSchema
emit(record, insert, out);
break;
case DELETE:
- SeaTunnelRow delete =
- new SeaTunnelRow(
- new Object[] {
-
documentKey.get(ID_FIELD).asObjectId().getValue().toString()
- });
+ SeaTunnelRow delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
break;
@@ -190,9 +185,7 @@ public class MongoDBConnectorDeserializationSchema
@Override
public Object apply(BsonValue bsonValue) {
if (isBsonValueNull(bsonValue) || isBsonDecimalNaN(bsonValue))
{
- throw new MongodbConnectorException(
- UNSUPPORTED_OPERATION,
- "Unable to convert to <" + type + "> from nullable
value " + bsonValue);
+ return null;
}
return internalConverter.apply(bsonValue);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index c01b36ef18..01a4c0a0f5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -65,8 +65,8 @@ import static org.awaitility.Awaitility.await;
@Slf4j
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK and FLINK do not support cdc")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
public class MongodbCDCIT extends TestSuiteBase implements TestResource {
//
----------------------------------------------------------------------------
@@ -182,6 +182,28 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
// insert update delete
upsertDeleteSourceTable();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ readMongodbData().stream()
+ .peek(e -> e.remove("_id"))
+ .map(Document::entrySet)
+ .map(Set::stream)
+ .map(
+ entryStream ->
+ entryStream
+
.map(Map.Entry::getValue)
+ .collect(
+
Collectors.toCollection(
+
ArrayList
+
::new)))
+ .collect(Collectors.toList()),
+ querySql());
+ });
+
+ cleanSourceTable();
+
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
@@ -233,6 +255,10 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
mongodbContainer.executeCommandFileInDatabase("inventoryDDL",
MONGODB_DATABASE);
}
+ private void cleanSourceTable() {
+ mongodbContainer.executeCommandFileInDatabase("inventoryClean",
MONGODB_DATABASE);
+ }
+
public void initConnection() {
String ipAddress = mongodbContainer.getHost();
Integer port = mongodbContainer.getFirstMappedPort();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
new file mode 100644
index 0000000000..fbbb0ea0df
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/ddl/inventoryClean.js
@@ -0,0 +1,16 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// -- this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+db.getCollection('products').deleteMany({})