This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 35592201c [FLINK-38452] Fix unable to read incremental data from 
MongoDB collections with dots (`.`) (#4148)
35592201c is described below

commit 35592201cc3989723dffb7fcc54786b4214e4988
Author: yuxiqian <[email protected]>
AuthorDate: Tue Sep 30 13:03:18 2025 +0800

    [FLINK-38452] Fix unable to read incremental data from MongoDB collections 
with dots (`.`) (#4148)
    
    Signed-off-by: yuxiqian <[email protected]>
---
 .../mongodb/source/dialect/MongoDBDialect.java     | 21 +--------
 .../mongodb/source/utils/MongoUtils.java           | 27 ++++++++++-
 .../mongodb/table/MongoDBRegexFilterITCase.java    | 54 +++++++++++++++++-----
 3 files changed, 70 insertions(+), 32 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
index d6ce3a9a1..dd6b8051b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchT
 import 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask;
 import 
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
 import 
org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.CollectionDiscoveryInfo;
+import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
 
 import com.mongodb.client.MongoClient;
 import io.debezium.relational.Column;
@@ -74,29 +75,11 @@ public class MongoDBDialect implements 
DataSourceDialect<MongoDBSourceConfig> {
         return "MongoDB";
     }
 
-    private static TableId parseTableId(String str) {
-        return parseTableId(str, true);
-    }
-
-    private static TableId parseTableId(String str, boolean 
useCatalogBeforeSchema) {
-        String[] parts = str.split("[.]", 2);
-        int numParts = parts.length;
-        if (numParts == 1) {
-            return new TableId(null, null, parts[0]);
-        } else if (numParts == 2) {
-            return useCatalogBeforeSchema
-                    ? new TableId(parts[0], null, parts[1])
-                    : new TableId(null, parts[0], parts[1]);
-        } else {
-            return null;
-        }
-    }
-
     @Override
     public List<TableId> discoverDataCollections(MongoDBSourceConfig 
sourceConfig) {
         CollectionDiscoveryInfo discoveryInfo = 
discoverAndCacheDataCollections(sourceConfig);
         return discoveryInfo.getDiscoveredCollections().stream()
-                .map(MongoDBDialect::parseTableId)
+                .map(MongoUtils::parseTableId)
                 .collect(Collectors.toList());
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
index 968833023..949030667 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java
@@ -101,6 +101,31 @@ public class MongoUtils {
 
     private MongoUtils() {}
 
+    /**
+     * MongoDB allows dots to be presented in collection names, but not in 
database names (see <a
+     * href="https://www.mongodb.com/docs/manual/reference/limits/";>docs</a> 
for more details). <br>
+     * So, for a canonical TableId with multiple dots in it (like {@code 
foo.bar.baz}, it is
+     * guaranteed that the first part ({@code foo}) is the database name and 
the latter part ({@code
+     * bar.baz}) is the table name.
+     */
+    public static TableId parseTableId(String str) {
+        return parseTableId(str, true);
+    }
+
+    public static TableId parseTableId(String str, boolean 
useCatalogBeforeSchema) {
+        String[] parts = str.split("[.]", 2);
+        int numParts = parts.length;
+        if (numParts == 1) {
+            return new TableId(null, null, parts[0]);
+        } else if (numParts == 2) {
+            return useCatalogBeforeSchema
+                    ? new TableId(parts[0], null, parts[1])
+                    : new TableId(null, parts[0], parts[1]);
+        } else {
+            return null;
+        }
+    }
+
     public static ChangeStreamDescriptor getChangeStreamDescriptor(
             MongoDBSourceConfig sourceConfig,
             List<String> discoveredDatabases,
@@ -115,7 +140,7 @@ public class MongoUtils {
                     collectionList, discoveredCollections)) {
                 changeStreamFilter =
                         ChangeStreamDescriptor.collection(
-                                TableId.parse(discoveredCollections.get(0)));
+                                parseTableId(discoveredCollections.get(0)));
             } else {
                 Pattern namespaceRegex =
                         
CollectionDiscoveryUtils.includeListAsFlatPattern(collectionList);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
index 2a2e37588..698218379 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java
@@ -30,6 +30,7 @@ import org.bson.Document;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Arrays;
 import java.util.List;
 
 import static 
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -264,27 +265,56 @@ class MongoDBRegexFilterITCase extends 
MongoDBSourceTestBase {
 
     @ParameterizedTest(name = "parallelismSnapshot: {0}")
     @ValueSource(booleans = {true, false})
-    void testMatchCollectionWithDots(boolean parallelismSnapshot) throws 
Exception {
+    void testMatchCollectionWithQuotedDots(boolean parallelismSnapshot) throws 
Exception {
+        testMatchCollectionWithDots(parallelismSnapshot, "[.]coll[.]name");
+    }
+
+    @ParameterizedTest(name = "parallelismSnapshot: {0}")
+    @ValueSource(booleans = {true, false})
+    void testMatchCollectionWithUnquotedDots(boolean parallelismSnapshot) 
throws Exception {
+        testMatchCollectionWithDots(parallelismSnapshot, ".coll.name");
+    }
+
+    private void testMatchCollectionWithDots(boolean parallelismSnapshot, 
String matchExpr)
+            throws Exception {
         setup(parallelismSnapshot);
-        // 1. Given colllections:
+
+        // 1. Given collections:
         // db: [coll.name]
         String db = 
MONGO_CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
 
-        TableResult result = submitTestCase(db, db + "[.]coll[.]name", 
parallelismSnapshot);
+        TableResult result = submitTestCase(db, db + matchExpr, 
parallelismSnapshot);
 
         // 2. Wait change stream records come
         waitForSinkSize("mongodb_sink", 3);
 
         // 3. Check results
-        String[] expected =
-                new String[] {
-                    String.format("+I[%s, coll.name, A101]", db),
-                    String.format("+I[%s, coll.name, A102]", db),
-                    String.format("+I[%s, coll.name, A103]", db)
-                };
-
-        List<String> actual = 
TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink");
-        Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
+        
Assertions.assertThat(TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink"))
+                .containsExactlyInAnyOrder(
+                        String.format("+I[%s, coll.name, A101]", db),
+                        String.format("+I[%s, coll.name, A102]", db),
+                        String.format("+I[%s, coll.name, A103]", db));
+
+        // 4. Prepare some incremental data
+        mongodbClient
+                .getDatabase(db)
+                .getCollection("coll.name")
+                .insertMany(
+                        Arrays.asList(
+                                Document.parse("{\"seq\": \"A104\"}"),
+                                Document.parse("{\"seq\": \"A105\"}"),
+                                Document.parse("{\"seq\": \"A106\"}")));
+
+        // 5. Check incremental records
+        waitForSinkSize("mongodb_sink", 6);
+        
Assertions.assertThat(TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink"))
+                .containsExactlyInAnyOrder(
+                        String.format("+I[%s, coll.name, A101]", db),
+                        String.format("+I[%s, coll.name, A102]", db),
+                        String.format("+I[%s, coll.name, A103]", db),
+                        String.format("+I[%s, coll.name, A104]", db),
+                        String.format("+I[%s, coll.name, A105]", db),
+                        String.format("+I[%s, coll.name, A106]", db));
 
         result.getJobClient().get().cancel().get();
     }

Reply via email to