This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f8d224b5f [flink] Fix mongodb CDC Ingestion QueryException (#2028)
f8d224b5f is described below
commit f8d224b5f356b7f6a5f230ea5c286195d6f0ee24
Author: monster <[email protected]>
AuthorDate: Tue Sep 19 10:06:12 2023 +0800
[flink] Fix mongodb CDC Ingestion QueryException (#2028)
---
.../flink/action/cdc/mongodb/MongodbSchema.java | 123 ++++++++++++---
.../cdc/mongodb/MongoDBActionITCaseBase.java | 5 +
.../flink/action/cdc/mongodb/MongoDBContainer.java | 4 +
.../action/cdc/mongodb/MongodbSchemaITCase.java | 169 +++++++++++++++++++++
4 files changed, 277 insertions(+), 24 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
index c266c163f..fbf648eeb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -21,14 +21,19 @@ package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.bson.Document;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -36,6 +41,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
+import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
@@ -86,43 +93,111 @@ public class MongodbSchema {
return primaryKeys;
}
+ /**
+ * Utility class for creating a MongoDB schema based on the provided
configuration. The schema
+ * can be created in one of the two modes:
+ *
+ * <ul>
+ * <li><b>SPECIFIED</b>: In this mode, the schema is created based on
the explicit column
+ * names provided in the configuration. The data types for all
columns are assumed to be
+ * STRING.
+ * <li><b>DYNAMIC</b>: In this mode, the schema is inferred dynamically
from the first
+ * document in the specified MongoDB collection.
+ * </ul>
+ *
+ * <p>The Configuration object passed to the createSchema method should
have the necessary
+ * MongoDB configuration properties set, including the host address,
database name, collection
+ * name, and optionally, the username and password for authentication. For
the SPECIFIED mode,
+ * the field names should also be specified in the configuration.
+ */
public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) {
SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig);
+ String databaseName =
+ Objects.requireNonNull(
+ mongodbConfig.get(MongoDBSourceOptions.DATABASE),
+ "Database name cannot be null");
+ String collectionName =
+ Objects.requireNonNull(
+ mongodbConfig.get(MongoDBSourceOptions.COLLECTION),
+ "Collection name cannot be null");
+
switch (mode) {
case SPECIFIED:
- return createSchemaFromSpecifiedConfig(mongodbConfig);
+ String[] columnNames =
+ Objects.requireNonNull(
+ mongodbConfig.get(FIELD_NAME), "Field
names cannot be null")
+ .split(",");
+ LinkedHashMap<String, DataType> schemaFields =
+ generateSchemaFields(Arrays.asList(columnNames));
+ return new MongodbSchema(
+ databaseName,
+ collectionName,
+ schemaFields,
+ Collections.singletonList(ID_FIELD));
case DYNAMIC:
- return createSchemaFromDynamicConfig(mongodbConfig);
+ String hosts =
+ Objects.requireNonNull(
+ mongodbConfig.get(MongoDBSourceOptions.HOSTS),
+ "Hosts cannot be null");
+
+ MongoClientSettings.Builder settingsBuilder =
MongoClientSettings.builder();
+
+ settingsBuilder.applyConnectionString(
+ new ConnectionString(
+ buildConnectionString(
+
mongodbConfig.get(MongoDBSourceOptions.USERNAME),
+
mongodbConfig.get(MongoDBSourceOptions.PASSWORD),
+
mongodbConfig.get(MongoDBSourceOptions.SCHEME),
+ hosts,
+ mongodbConfig.get(
+
MongoDBSourceOptions.CONNECTION_OPTIONS))));
+
+ MongoClientSettings settings = settingsBuilder.build();
+
+ try (MongoClient mongoClient = MongoClients.create(settings)) {
+ MongoDatabase database =
mongoClient.getDatabase(databaseName);
+ MongoCollection<Document> collection =
database.getCollection(collectionName);
+ Document firstDocument = collection.find().first();
+
+ if (firstDocument == null) {
+ throw new IllegalStateException(
+ "No documents in collection to infer schema");
+ }
+
+ return createMongodbSchema(
+ databaseName, collectionName,
getColumnNames(firstDocument));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create schema from MongoDB collection",
e);
+ }
default:
throw new IllegalArgumentException("Unsupported schema
acquisition mode: " + mode);
}
}
- private static SchemaAcquisitionMode getModeFromConfig(Configuration
mongodbConfig) {
- return
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
- }
+ public static String buildConnectionString(
+ @Nullable String username,
+ @Nullable String password,
+ String scheme,
+ String hosts,
+ @Nullable String connectionOptions) {
+ StringBuilder sb = new StringBuilder(scheme).append("://");
- private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration
mongodbConfig) {
- String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
- LinkedHashMap<String, DataType> schemaFields =
- generateSchemaFields(Arrays.asList(columnNames));
- String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
- String collectionName =
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
- return new MongodbSchema(
- databaseName, collectionName, schemaFields,
Collections.singletonList(ID_FIELD));
- }
+ if (StringUtils.isNotEmpty(username) &&
StringUtils.isNotEmpty(password)) {
+
sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
+ }
+
+ sb.append(checkNotNull(hosts));
- private static MongodbSchema createSchemaFromDynamicConfig(Configuration
mongodbConfig) {
- String hosts = mongodbConfig.get(MongoDBSourceOptions.HOSTS);
- String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
- String collectionName =
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
- String url = String.format("mongodb://%s/%s", hosts, databaseName);
- try (MongoClient mongoClient = MongoClients.create(url)) {
- MongoDatabase database = mongoClient.getDatabase(databaseName);
- MongoCollection<Document> collection =
database.getCollection(collectionName);
- Document firstDocument = collection.find().first();
- return createMongodbSchema(databaseName, collectionName,
getColumnNames(firstDocument));
+ if (StringUtils.isNotEmpty(connectionOptions)) {
+ sb.append("/?").append(connectionOptions);
}
+
+ return sb.toString();
+ }
+
+ private static SchemaAcquisitionMode getModeFromConfig(Configuration
mongodbConfig) {
+ return
SchemaAcquisitionMode.valueOf(mongodbConfig.get(START_MODE).toUpperCase());
}
private static List<String> getColumnNames(Document document) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
index 53e8d11eb..4d2fec69d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionITCaseBase.java
@@ -38,6 +38,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBContainer.PAIMON_USER;
+import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBContainer.PAIMON_USER_PASSWORD;
+
/** Base test class for {@link org.apache.paimon.flink.action.Action}s related
to MongoDB. */
public abstract class MongoDBActionITCaseBase extends CdcActionITCaseBase {
@@ -65,6 +68,8 @@ public abstract class MongoDBActionITCaseBase extends
CdcActionITCaseBase {
protected Map<String, String> getBasicMongoDBConfig() {
Map<String, String> config = new HashMap<>();
config.put("hosts", MONGODB_CONTAINER.getHostAndPort());
+ config.put("username", PAIMON_USER);
+ config.put("password", PAIMON_USER_PASSWORD);
return config;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
index 969a5fc25..177749c51 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBContainer.java
@@ -46,6 +46,10 @@ public class MongoDBContainer extends
org.testcontainers.containers.MongoDBConta
private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)//.*$");
+ public static final String PAIMON_USER = "flinkuser";
+
+ public static final String PAIMON_USER_PASSWORD =
"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
+
public static final int MONGODB_PORT = 27017;
public MongoDBContainer(String imageName) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
new file mode 100644
index 000000000..fa08d2fc8
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mongodb;
+
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import org.apache.flink.configuration.Configuration;
+import org.bson.Document;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Tests for {@link MongodbSchema}. */
+public class MongodbSchemaITCase extends MongoDBActionITCaseBase {
+
+ @BeforeAll
+ public static void initMongoDB() {
+ // Create a real MongoDB client and insert a document to infer the
schema
+ MongoClientSettings.Builder settingsBuilder =
+ MongoClientSettings.builder()
+ .applyToClusterSettings(
+ builder ->
+ builder.hosts(
+ Collections.singletonList(
+ new ServerAddress(
+
MONGODB_CONTAINER
+
.getHostAndPort()))))
+ .credential(
+ MongoCredential.createCredential(
+ MongoDBContainer.PAIMON_USER,
+ "admin",
+
MongoDBContainer.PAIMON_USER_PASSWORD.toCharArray()));
+
+ MongoClientSettings settings = settingsBuilder.build();
+ try (MongoClient mongoClient = MongoClients.create(settings)) {
+ MongoDatabase database = mongoClient.getDatabase("testDatabase");
+ MongoCollection<Document> collection =
database.getCollection("testCollection");
+ Document doc = new Document("name", "Alice").append("age", 30);
+ collection.insertOne(doc);
+ }
+ }
+
+ @Test
+ public void testCreateSchemaFromValidConfig() {
+ Configuration mongodbConfig = new Configuration();
+ mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.setString(
+ MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+ MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+ assertNotNull(schema);
+ assertEquals("testDatabase", schema.databaseName());
+ assertEquals("testCollection", schema.tableName());
+ }
+
+ @Test
+ public void testCreateSchemaFromInvalidHost() {
+ Configuration mongodbConfig = new Configuration();
+ mongodbConfig.setString(MongoDBSourceOptions.HOSTS, "127.0.0.1:12345");
+ mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.setString(
+ MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+
+ assertThrows(RuntimeException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ }
+
+ @Test
+ public void testCreateSchemaFromIncompleteConfig() {
+ // Create a Configuration object with missing necessary settings
+ Configuration mongodbConfig = new Configuration();
+ mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ // Expect an exception to be thrown due to missing necessary settings
+ assertThrows(
+ NullPointerException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ }
+
+ @Test
+ public void testCreateSchemaFromDynamicConfig() {
+ // Create a Configuration object with the necessary settings
+ Configuration mongodbConfig = new Configuration();
+ mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.setString(
+ MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+
+ // Call the method and check the results
+ MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig);
+
+ // Verify the schema
+ assertNotNull(schema);
+ assertEquals("testDatabase", schema.databaseName());
+ assertEquals("testCollection", schema.tableName());
+
+ LinkedHashMap<String, DataType> expectedFields = new LinkedHashMap<>();
+ expectedFields.put("name", DataTypes.STRING());
+ expectedFields.put("age", DataTypes.STRING());
+ expectedFields.put("_id", DataTypes.STRING());
+
+ assertEquals(expectedFields, schema.fields());
+ }
+
+ @Test
+ public void testCreateSchemaFromInvalidDatabase() {
+ Configuration mongodbConfig = new Configuration();
+ mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.setString(
+ MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.setString(MongoDBSourceOptions.DATABASE,
"invalidDatabase");
+ mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"testCollection");
+
+ assertThrows(RuntimeException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ }
+
+ @Test
+ public void testCreateSchemaFromInvalidCollection() {
+ Configuration mongodbConfig = new Configuration();
+ mongodbConfig.setString(MongoDBSourceOptions.HOSTS,
MONGODB_CONTAINER.getHostAndPort());
+ mongodbConfig.setString(MongoDBSourceOptions.USERNAME,
MongoDBContainer.PAIMON_USER);
+ mongodbConfig.setString(
+ MongoDBSourceOptions.PASSWORD,
MongoDBContainer.PAIMON_USER_PASSWORD);
+ mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS,
"authSource=admin");
+ mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase");
+ mongodbConfig.setString(MongoDBSourceOptions.COLLECTION,
"invalidCollection");
+
+ assertThrows(RuntimeException.class, () ->
MongodbSchema.getMongodbSchema(mongodbConfig));
+ }
+}