This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d224b5f [flink] Fix mongodb CDC Ingestion QueryException (#2028)
f8d224b5f is described below

commit f8d224b5f356b7f6a5f230ea5c286195d6f0ee24
Author: monster <[email protected]>
AuthorDate: Tue Sep 19 10:06:12 2023 +0800

    [flink] Fix mongodb CDC Ingestion QueryException (#2028)
---
 .../flink/action/cdc/mongodb/MongodbSchema.java    | 123 ++++++++++++---
 .../cdc/mongodb/MongoDBActionITCaseBase.java       |   5 +
 .../flink/action/cdc/mongodb/MongoDBContainer.java |   4 +
 .../action/cdc/mongodb/MongodbSchemaITCase.java    | 169 +++++++++++++++++++++
 4 files changed, 277 insertions(+), 24 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
index c266c163f..fbf648eeb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -21,14 +21,19 @@ package org.apache.paimon.flink.action.cdc.mongodb;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.bson.Document;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,6 +41,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 
+import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
 import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
 
@@ -86,43 +93,111 @@ public class MongodbSchema {
         return primaryKeys;
     }
 
+    /**
+     * Utility class for creating a MongoDB schema based on the provided 
configuration. The schema
+     * can be created in one of the two modes:
+     *
+     * <ul>
+     *   <li><b>SPECIFIED</b>: In this mode, the schema is created based on 
the explicit column
+     *       names provided in the configuration. The data types for all 
columns are assumed to be
+     *       STRING.
+     *   <li><b>DYNAMIC</b>: In this mode, the schema is inferred dynamically 
from the first
+     *       document in the specified MongoDB collection.
+     * </ul>
+     *
+     * <p>The Configuration object passed to the createSchema method should 
have the necessary
+     * MongoDB configuration properties set, including the host address, 
database name, collection
+     * name, and optionally, the username and password for authentication. For 
the SPECIFIED mode,
+     * the field names should also be specified in the configuration.
+     */
     public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
         SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
+        String databaseName =
+                Objects.requireNonNull(
+                        mongodbConfig.get(MongoDBSourceOptions.DATABASE),
+                        "Database name cannot be null");
+        String collectionName =
+                Objects.requireNonNull(
+                        mongodbConfig.get(MongoDBSourceOptions.COLLECTION),
+                        "Collection name cannot be null");
+
         switch (mode) {
             case SPECIFIED:
-                return createSchemaFromSpecifiedConfig(mongodbConfig);
+                String[] columnNames =
+                        Objects.requireNonNull(
+                                        mongodbConfig.get(FIELD_NAME), "Field 
names cannot be null")
+                                .split(",");
+                LinkedHashMap<String, DataType> schemaFields =
+                        generateSchemaFields(Arrays.asList(columnNames));
+                return new MongodbSchema(
+                        databaseName,
+                        collectionName,
+                        schemaFields,
+                        Collections.singletonList(ID_FIELD));
             case DYNAMIC:
-                return createSchemaFromDynamicConfig(mongodbConfig);
+                String hosts =
+                        Objects.requireNonNull(
+                                mongodbConfig.get(MongoDBSourceOptions.HOSTS),
+                                "Hosts cannot be null");
+
+                MongoClientSettings.Builder settingsBuilder = 
MongoClientSettings.builder();
+
+                settingsBuilder.applyConnectionString(
+                        new ConnectionString(
+                                buildConnectionString(
+                                        
mongodbConfig.get(MongoDBSourceOptions.USERNAME),
+                                        
mongodbConfig.get(MongoDBSourceOptions.PASSWORD),
+                                        
mongodbConfig.get(MongoDBSourceOptions.SCHEME),
+                                        hosts,
+                                        mongodbConfig.get(
+                                                
MongoDBSourceOptions.CONNECTION_OPTIONS))));
+
+                MongoClientSettings settings = settingsBuilder.build();
+
+                try (MongoClient mongoClient = MongoClients.create(settings)) {
+                    MongoDatabase database = 
mongoClient.getDatabase(databaseName);
+                    MongoCollection<Document> collection = 
database.getCollection(collectionName);
+                    Document firstDocument = collection.find().first();
+
+                    if (firstDocument == null) {
+                        throw new IllegalStateException(
+                                "No documents in collection to infer schema");
+                    }
+
+                    return createMongodbSchema(
+                            databaseName, collectionName, 
getColumnNames(firstDocument));
+                } catch (Exception e) {
+                    throw new RuntimeException(
+                            "Failed to create schema from MongoDB collection", 
e);
+                }
             default:
                 throw new IllegalArgumentException("Unsupported schema 
acquisition mode: " + mode);
         }
     }
 
-    private static SchemaAcquisitionMode getModeFromConfig(Configuration 
mongodbConfig) {
-        return 
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
-    }
+    public static String buildConnectionString(
+            @Nullable String username,
+            @Nullable String password,
+            String scheme,
+            String hosts,
+            @Nullable String connectionOptions) {
+        StringBuilder sb = new StringBuilder(scheme).append("://");
 
-    private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration 
mongodbConfig) {
-        String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
-        LinkedHashMap<String, DataType> schemaFields =
-                generateSchemaFields(Arrays.asList(columnNames));
-        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
-        String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
-        return new MongodbSchema(
-                databaseName, collectionName, schemaFields, 
Collections.singletonList(ID_FIELD));
-    }
+        if (StringUtils.isNotEmpty(username) && 
StringUtils.isNotEmpty(password)) {
+            
sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
+        }
+
+        sb.append(checkNotNull(hosts));
 
-    private static MongodbSchema createSchemaFromDynamicConfig(Configuration 
mongodbConfig) {
-        String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
-        String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
-        String collectionName = 
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
-        String url = String.format("mongodb://%s/%s", hosts, databaseName);
-        try (MongoClient mongoClient = MongoClients.create(url)) {
-            MongoDatabase database = mongoClient.getDatabase(databaseName);
-            MongoCollection<Document> collection = 
database.getCollection(collectionName);
-            Document firstDocument = collection.find().first();
-            return createMongodbSchema(databaseName, collectionName, 
getColumnNames(firstDocument));
+        if (StringUtils.isNotEmpty(connectionOptions)) {
+            sb.append("/?").append(connectionOptions);
         }
+
+        return sb.toString();
+    }
+
+    private static SchemaAcquisitionMode getModeFromConfig(Configuration 
mongodbConfig) {
+        return 
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
     }
 
     private static List<String> getColumnNames(Document document) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index 53e8d11eb..4d2fec69d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -38,6 +38,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBContainer.PAIMON_USER;
+import static 
org.apache.paimon.flink.action.cdc.mongodb.MongoDBContainer.PAIMON_USER_PASSWORD;
+
 /** Base test class for {@link org.apache.paimon.flink.action.Action}s related 
to MongoDB. */
 public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
 
@@ -65,6 +68,8 @@ public abstract class MongoDBActionITCaseBase extends 
CdcActionITCaseBase {
     protected Map<String, String> getBasicMongoDBConfig() {
         Map<String, String> config = new HashMap<>();
         config.put("hosts", MONGODB_CONTAINER.getHostAndPort());
+        config.put("username", PAIMON_USER);
+        config.put("password", PAIMON_USER_PASSWORD);
         return config;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
index 969a5fc25..177749c51 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
@@ -46,6 +46,10 @@ public class MongoDBContainer extends 
org.testcontainers.containers.MongoDBConta
 
     private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)//.*$");
 
+    public static final String PAIMON_USER = "flinkuser";
+
+    public static final String PAIMON_USER_PASSWORD = 
"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
+
     public static final int MONGODB_PORT = 27017;
 
     public MongoDBContainer(String imageName) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
new file mode 100644
index 000000000..fa08d2fc8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.configuration.Configuration;
+import org.bson.Document;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link MongodbSchema}. */
+public class MongodbSchemaITCase extends MongoDBActionITCaseBase {
+
+    @BeforeAll
+    public static void initMongoDB() {
+        // Create a real MongoDB client and insert a document to infer the 
schema
+        MongoClientSettings.Builder settingsBuilder =
+                MongoClientSettings.builder()
+                        .applyToClusterSettings(
+                                builder ->
+                                        builder.hosts(
+                                                Collections.singletonList(
+                                                        new ServerAddress(
+                                                                
MONGODB_CONTAINER
+                                                                        
.getHostAndPort()))))
+                        .credential(
+                                MongoCredential.createCredential(
+                                        MongoDBContainer.PAIMON_USER,
+                                        "admin",
+                                        
MongoDBContainer.PAIMON_USER_PASSWORD.toCharArray()));
+
+        MongoClientSettings settings = settingsBuilder.build();
+        try (MongoClient mongoClient = MongoClients.create(settings)) {
+            MongoDatabase database = mongoClient.getDatabase("testDatabase");
+            MongoCollection<Document> collection = 
database.getCollection("testCollection");
+            Document doc = new Document("name", "Alice").append("age", 30);
+            collection.insertOne(doc);
+        }
+    }
+
+    @Test
+    public void testCreateSchemaFromValidConfig() {
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.setString(
+                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+        MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+        assertNotNull(schema);
+        assertEquals("testDatabase", schema.databaseName());
+        assertEquals("testCollection", schema.tableName());
+    }
+
+    @Test
+    public void testCreateSchemaFromInvalidHost() {
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
+        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.setString(
+                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+
+        assertThrows(RuntimeException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+    }
+
+    @Test
+    public void testCreateSchemaFromIncompleteConfig() {
+        // Create a Configuration object with missing necessary settings
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        // Expect an exception to be thrown due to missing necessary settings
+        assertThrows(
+                NullPointerException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+    }
+
+    @Test
+    public void testCreateSchemaFromDynamicConfig() {
+        // Create a Configuration object with the necessary settings
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.setString(
+                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+
+        // Call the method and check the results
+        MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+
+        // Verify the schema
+        assertNotNull(schema);
+        assertEquals("testDatabase", schema.databaseName());
+        assertEquals("testCollection", schema.tableName());
+
+        LinkedHashMap<String, DataType> expectedFields = new LinkedHashMap<>();
+        expectedFields.put("name", DataTypes.STRING());
+        expectedFields.put("age", DataTypes.STRING());
+        expectedFields.put("_id", DataTypes.STRING());
+
+        assertEquals(expectedFields, schema.fields());
+    }
+
+    @Test
+    public void testCreateSchemaFromInvalidDatabase() {
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.setString(
+                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, 
"invalidDatabase");
+        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"testCollection");
+
+        assertThrows(RuntimeException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+    }
+
+    @Test
+    public void testCreateSchemaFromInvalidCollection() {
+        Configuration mongodbConfig = new Configuration();
+        mongodbConfig.setString(MongoDBSourceOptions.HOSTS, 
MONGODB_CONTAINER.getHostAndPort());
+        mongodbConfig.setString(MongoDBSourceOptions.USERNAME, 
MongoDBContainer.PAIMON_USER);
+        mongodbConfig.setString(
+                MongoDBSourceOptions.PASSWORD, 
MongoDBContainer.PAIMON_USER_PASSWORD);
+        mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, 
"authSource=admin");
+        mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+        mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, 
"invalidCollection");
+
+        assertThrows(RuntimeException.class, () -> 
MongodbSchema.getMongodbSchema(mongodbConfig));
+    }
+}

Reply via email to