This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 35592201c [FLINK-38452] Fix unable to read incremental data from
MongoDB collections with dots (`.`) (#4148)
35592201c is described below
commit 35592201cc3989723dffb7fcc54786b4214e4988
Author: yuxiqian <[email protected]>
AuthorDate: Tue Sep 30 13:03:18 2025 +0800
[FLINK-38452] Fix unable to read incremental data from MongoDB collections
with dots (`.`) (#4148)
Signed-off-by: yuxiqian <[email protected]>
---
.../mongodb/source/dialect/MongoDBDialect.java | 21 +--------
.../mongodb/source/utils/MongoUtils.java | 27 ++++++++++-
.../mongodb/table/MongoDBRegexFilterITCase.java | 54 +++++++++++++++++-----
3 files changed, 70 insertions(+), 32 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
index d6ce3a9a1..dd6b8051b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
@@ -31,6 +31,7 @@ import
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchT
import
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask;
import
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import
org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.CollectionDiscoveryInfo;
+import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import com.mongodb.client.MongoClient;
import io.debezium.relational.Column;
@@ -74,29 +75,11 @@ public class MongoDBDialect implements
DataSourceDialect<MongoDBSourceConfig> {
return "MongoDB";
}
- private static TableId parseTableId(String str) {
- return parseTableId(str, true);
- }
-
- private static TableId parseTableId(String str, boolean
useCatalogBeforeSchema) {
- String[] parts = str.split("[.]", 2);
- int numParts = parts.length;
- if (numParts == 1) {
- return new TableId(null, null, parts[0]);
- } else if (numParts == 2) {
- return useCatalogBeforeSchema
- ? new TableId(parts[0], null, parts[1])
- : new TableId(null, parts[0], parts[1]);
- } else {
- return null;
- }
- }
-
@Override
public List<TableId> discoverDataCollections(MongoDBSourceConfig
sourceConfig) {
CollectionDiscoveryInfo discoveryInfo =
discoverAndCacheDataCollections(sourceConfig);
return discoveryInfo.getDiscoveredCollections().stream()
- .map(MongoDBDialect::parseTableId)
+ .map(MongoUtils::parseTableId)
.collect(Collectors.toList());
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
index 968833023..949030667 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
@@ -101,6 +101,31 @@ public class MongoUtils {
private MongoUtils() {}
+ /**
+ * MongoDB allows dots to be presented in collection names, but not in
database names (see <a
+ * href="https://www.mongodb.com/docs/manual/reference/limits/">docs</a>
for more details). <br>
+ * So, for a canonical TableId with multiple dots in it (like {@code
foo.bar.baz}, it is
+ * guaranteed that the first part ({@code foo}) is the database name and
the latter part ({@code
+ * bar.baz}) is the table name.
+ */
+ public static TableId parseTableId(String str) {
+ return parseTableId(str, true);
+ }
+
+ public static TableId parseTableId(String str, boolean
useCatalogBeforeSchema) {
+ String[] parts = str.split("[.]", 2);
+ int numParts = parts.length;
+ if (numParts == 1) {
+ return new TableId(null, null, parts[0]);
+ } else if (numParts == 2) {
+ return useCatalogBeforeSchema
+ ? new TableId(parts[0], null, parts[1])
+ : new TableId(null, parts[0], parts[1]);
+ } else {
+ return null;
+ }
+ }
+
public static ChangeStreamDescriptor getChangeStreamDescriptor(
MongoDBSourceConfig sourceConfig,
List<String> discoveredDatabases,
@@ -115,7 +140,7 @@ public class MongoUtils {
collectionList, discoveredCollections)) {
changeStreamFilter =
ChangeStreamDescriptor.collection(
- TableId.parse(discoveredCollections.get(0)));
+ parseTableId(discoveredCollections.get(0)));
} else {
Pattern namespaceRegex =
CollectionDiscoveryUtils.includeListAsFlatPattern(collectionList);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
index 2a2e37588..698218379 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
@@ -30,6 +30,7 @@ import org.bson.Document;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Arrays;
import java.util.List;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -264,27 +265,56 @@ class MongoDBRegexFilterITCase extends
MongoDBSourceTestBase {
@ParameterizedTest(name = "parallelismSnapshot: {0}")
@ValueSource(booleans = {true, false})
- void testMatchCollectionWithDots(boolean parallelismSnapshot) throws
Exception {
+ void testMatchCollectionWithQuotedDots(boolean parallelismSnapshot) throws
Exception {
+ testMatchCollectionWithDots(parallelismSnapshot, "[.]coll[.]name");
+ }
+
+ @ParameterizedTest(name = "parallelismSnapshot: {0}")
+ @ValueSource(booleans = {true, false})
+ void testMatchCollectionWithUnquotedDots(boolean parallelismSnapshot)
throws Exception {
+ testMatchCollectionWithDots(parallelismSnapshot, ".coll.name");
+ }
+
+ private void testMatchCollectionWithDots(boolean parallelismSnapshot,
String matchExpr)
+ throws Exception {
setup(parallelismSnapshot);
- // 1. Given colllections:
+
+ // 1. Given collections:
// db: [coll.name]
String db =
MONGO_CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
- TableResult result = submitTestCase(db, db + "[.]coll[.]name",
parallelismSnapshot);
+ TableResult result = submitTestCase(db, db + matchExpr,
parallelismSnapshot);
// 2. Wait change stream records come
waitForSinkSize("mongodb_sink", 3);
// 3. Check results
- String[] expected =
- new String[] {
- String.format("+I[%s, coll.name, A101]", db),
- String.format("+I[%s, coll.name, A102]", db),
- String.format("+I[%s, coll.name, A103]", db)
- };
-
- List<String> actual =
TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink");
- Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
+
Assertions.assertThat(TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink"))
+ .containsExactlyInAnyOrder(
+ String.format("+I[%s, coll.name, A101]", db),
+ String.format("+I[%s, coll.name, A102]", db),
+ String.format("+I[%s, coll.name, A103]", db));
+
+ // 4. Prepare some incremental data
+ mongodbClient
+ .getDatabase(db)
+ .getCollection("coll.name")
+ .insertMany(
+ Arrays.asList(
+ Document.parse("{\"seq\": \"A104\"}"),
+ Document.parse("{\"seq\": \"A105\"}"),
+ Document.parse("{\"seq\": \"A106\"}")));
+
+ // 5. Check incremental records
+ waitForSinkSize("mongodb_sink", 6);
+
Assertions.assertThat(TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink"))
+ .containsExactlyInAnyOrder(
+ String.format("+I[%s, coll.name, A101]", db),
+ String.format("+I[%s, coll.name, A102]", db),
+ String.format("+I[%s, coll.name, A103]", db),
+ String.format("+I[%s, coll.name, A104]", db),
+ String.format("+I[%s, coll.name, A105]", db),
+ String.format("+I[%s, coll.name, A106]", db));
result.getJobClient().get().cancel().get();
}