This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c6b067d [feature][datasource] Add Mongodb datasource
4c6b067d is described below

commit 4c6b067dfefa42a00e69bbb09f94736f371ad81d
Author: XiaoJiang521 <[email protected]>
AuthorDate: Tue Nov 7 14:45:29 2023 +0800

    [feature][datasource] Add Mongodb datasource
---
 .../classloader/DatasourceLoadConfig.java          |   6 +-
 .../datasource-mongodb/pom.xml                     |  64 ++++++++++++
 .../plugin/mongodb/MongoDataSoueceChannel.java     | 106 +++++++++++++++++++
 .../plugin/mongodb/MongoDataSourceFactory.java     |  57 ++++++++++
 .../datasource/plugin/mongodb/MongoOptionRule.java |  51 +++++++++
 .../plugin/mongodb/MongoRequestParamsUtils.java    |  33 ++++++
 .../datasource/plugin/api/DataSourceChannel.java   |  14 ++-
 .../seatunnel-datasource-plugins/pom.xml           |   1 +
 .../src/main/bin/download_datasource.sh            |   1 +
 .../impl/MongoDBDataSourceConfigSwitcher.java      | 115 +++++++++++++++++++++
 .../resources/connector-datasource-mapper.yaml     |  19 ++++
 seatunnel-web-dist/pom.xml                         |  13 +++
 12 files changed, 477 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
 
b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
index ef3dc0d7..a8b48078 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
@@ -92,6 +92,8 @@ public class DatasourceLoadConfig {
         classLoaderFactoryName.put(
                 "JDBC-STARROCKS",
                 
"org.apache.seatunnel.datasource.plugin.starrocks.jdbc.StarRocksJdbcDataSourceFactory");
+        classLoaderFactoryName.put(
+                "MONGODB", 
"com.apache.seatunnel.datasource.plugin.mongodb.MongoDataSourceFactory");
 
         classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-");
         classLoaderJarName.put("JDBC-CLICKHOUSE", 
"datasource-jdbc-clickhouse-");
@@ -111,6 +113,7 @@ public class DatasourceLoadConfig {
         classLoaderJarName.put("STARROCKS", "datasource-starrocks-");
         classLoaderJarName.put("S3-REDSHIFT", "datasource-s3redshift-");
         classLoaderJarName.put("JDBC-STARROCKS", "datasource-jdbc-starrocks-");
+        classLoaderJarName.put("MONGODB", "datasource-mongodb-");
     }
 
     public static final Set<String> pluginSet =
@@ -127,7 +130,8 @@ public class DatasourceLoadConfig {
                     "MySQL-CDC",
                     "S3",
                     "SqlServer-CDC",
-                    "StarRocks");
+                    "StarRocks",
+                    "MongoDB");
 
     public static Map<String, DatasourceClassLoader> datasourceClassLoaders = 
new HashMap<>();
 
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml
new file mode 100644
index 00000000..8bbf4dcf
--- /dev/null
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-datasource-plugins</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>datasource-mongodb</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>datasource-plugins-api</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver-sync</artifactId>
+            <version>4.7.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <skip>${e2e.dependency.skip}</skip>
+                    <appendOutput>true</appendOutput>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java
new file mode 100644
index 00000000..55ba487d
--- /dev/null
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
+import org.apache.seatunnel.datasource.plugin.api.model.TableField;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.ImmutableList;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoIterable;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Slf4j
+public class MongoDataSoueceChannel implements DataSourceChannel {
+
+    private static final String DATABASE = "default";
+
+    @Override
+    public OptionRule getDataSourceOptions(@NonNull String pluginName) {
+        return MongoOptionRule.optionRule();
+    }
+
+    @Override
+    public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull 
String pluginName) {
+        return MongoOptionRule.metadataRule();
+    }
+
+    public List<String> getTables(
+            @NonNull String pluginName,
+            Map<String, String> requestParams,
+            String database,
+            Map<String, String> options) {
+        checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), 
"database must be default");
+
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<String> getDatabases(
+            @NonNull String pluginName, @NonNull Map<String, String> 
requestParams) {
+        return ImmutableList.of(DATABASE);
+    }
+
+    @Override
+    public List<TableField> getTableFields(
+            @NonNull String pluginName,
+            @NonNull Map<String, String> requestParams,
+            @NonNull String database,
+            @NonNull String table) {
+        checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), 
"database must be default");
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean checkDataSourceConnectivity(
+            @NonNull String pluginName, @NonNull Map<String, String> 
requestParams) {
+
+        try (MongoClient mongoClient = createMongoClient(requestParams)) {
+            // Verify if the connection to mongodb was successful
+            MongoIterable<String> databaseNames = 
mongoClient.listDatabaseNames();
+            if (databaseNames.iterator().hasNext()) {
+                log.info("mongoDB connection successful");
+                return true;
+            } else {
+                return false;
+            }
+        } catch (Exception e) {
+            throw new DataSourcePluginException(
+                    "check MongoDB connectivity failed, " + e.getMessage(), e);
+        }
+    }
+
+    // Resolve the URI in requestParams of Map type
+    private MongoClient createMongoClient(Map<String, String> requestParams) {
+
+        return MongoClients.create(
+                
MongoRequestParamsUtils.parseStringFromRequestParams(requestParams));
+    }
+}
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java
new file mode 100644
index 00000000..b08f5b30
--- /dev/null
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
+import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
+import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+@AutoService(DataSourceFactory.class)
+public class MongoDataSourceFactory implements DataSourceFactory {
+    public static final String MONGO_PLUGIN_NAME = "MongoDB";
+    public static final String MONGO_PLUGIN_ICON = "MongoDB";
+    public static final String MONGO_PLUGIN_VERSION = "1.0.0";
+
+    @Override
+    public String factoryIdentifier() {
+        return MONGO_PLUGIN_NAME;
+    }
+
+    @Override
+    public Set<DataSourcePluginInfo> supportedDataSources() {
+        return Sets.newHashSet(
+                DataSourcePluginInfo.builder()
+                        .name(MONGO_PLUGIN_NAME)
+                        .icon(MONGO_PLUGIN_ICON)
+                        .version(MONGO_PLUGIN_VERSION)
+                        .supportVirtualTables(true)
+                        .type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
+                        .build());
+    }
+
+    @Override
+    public DataSourceChannel createChannel() {
+        return new MongoDataSoueceChannel();
+    }
+}
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java
new file mode 100644
index 00000000..cde0f89f
--- /dev/null
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+
+public class MongoOptionRule {
+
+    public static final Option<String> URI =
+            Options.key("uri")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The MongoDB connection uri.");
+
+    public static final Option<String> DATABASE =
+            Options.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The name of MongoDB database to read or 
write.");
+
+    public static final Option<String> COLLECTION =
+            Options.key("collection")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The name of MongoDB collection to read 
or write.");
+
+    public static OptionRule optionRule() {
+        return OptionRule.builder().required(URI).build();
+    }
+
+    public static OptionRule metadataRule() {
+        return OptionRule.builder().required(DATABASE, COLLECTION).build();
+    }
+}
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java
new file mode 100644
index 00000000..eb342acf
--- /dev/null
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MongoRequestParamsUtils {
+
+    public static String parseStringFromRequestParams(Map<String, String> 
requestParams) {
+        checkArgument(
+                requestParams.containsKey(MongoOptionRule.URI.key()),
+                String.format("Missing %s in requestParams", 
MongoOptionRule.URI.key()));
+
+        return requestParams.get(MongoOptionRule.URI.key());
+    }
+}
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
index 72a2ea20..8fb645d4 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
@@ -28,6 +28,8 @@ import lombok.NonNull;
 import java.sql.Connection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public interface DataSourceChannel {
 
@@ -71,11 +73,19 @@ public interface DataSourceChannel {
             @NonNull String database,
             @NonNull String table);
 
-    Map<String, List<TableField>> getTableFields(
+    default Map<String, List<TableField>> getTableFields(
             @NonNull String pluginName,
             @NonNull Map<String, String> requestParams,
             @NonNull String database,
-            @NonNull List<String> tables);
+            @NonNull List<String> tables) {
+        return tables.parallelStream()
+                .collect(
+                        Collectors.toMap(
+                                Function.identity(),
+                                table ->
+                                        getTableFields(
+                                                pluginName, requestParams, 
database, table)));
+    }
 
     /**
      * just check metadata field is right and used by virtual table
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml 
b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
index 5f2296bb..44c6ac6c 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
@@ -44,6 +44,7 @@
         <module>datasource-s3</module>
         <module>datasource-sqlserver-cdc</module>
         <module>datasource-jdbc-tidb</module>
+        <module>datasource-mongodb</module>
     </modules>
 
 </project>
diff --git a/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh 
b/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
index 1c09af2f..3ea36ab6 100644
--- a/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
+++ b/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
@@ -43,6 +43,7 @@ datasource_list=(
   "datasource-s3"
   "datasource-sqlserver-cdc"
   "datasource-starrocks"
+  "datasource-mongodb"
 )
 
 # the datasource default version is 1.0.0, you can also choose a custom 
version. eg: 1.1.2:  sh install-datasource.sh 2.1.2
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..2153c52e
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thirdparty.datasource.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import 
org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import 
org.apache.seatunnel.app.thirdparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@AutoService(DataSourceConfigSwitcher.class)
+@Slf4j
+public class MongoDBDataSourceConfigSwitcher extends 
AbstractDataSourceConfigSwitcher {
+    private static final String DATABASE = "database";
+    private static final String COLLECTION = "collection";
+    private static final String SCHEMA = "schema";
+
+    @Override
+    public String getDataSourceName() {
+        return "MONGODB";
+    }
+
+    @Override
+    public FormStructure filterOptionRule(
+            String connectorName,
+            OptionRule dataSourceOptionRule,
+            OptionRule virtualTableOptionRule,
+            BusinessMode businessMode,
+            PluginType pluginType,
+            OptionRule connectorOptionRule,
+            List<RequiredOption> addRequiredOptions,
+            List<Option<?>> addOptionalOptions,
+            List<String> excludedKeys) {
+        excludedKeys.add(SCHEMA);
+        return super.filterOptionRule(
+                connectorName,
+                dataSourceOptionRule,
+                virtualTableOptionRule,
+                businessMode,
+                pluginType,
+                connectorOptionRule,
+                addRequiredOptions,
+                addOptionalOptions,
+                excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+            Config dataSourceInstanceConfig,
+            VirtualTableDetailRes virtualTableDetail,
+            DataSourceOption dataSourceOption,
+            SelectTableFields selectTableFields,
+            BusinessMode businessMode,
+            PluginType pluginType,
+            Config connectorConfig) {
+        // Use field to generate the schema
+        connectorConfig =
+                connectorConfig.withValue(
+                        DATABASE,
+                        ConfigValueFactory.fromAnyRef(
+                                
virtualTableDetail.getDatasourceProperties().get(DATABASE)));
+        connectorConfig =
+                connectorConfig.withValue(
+                        COLLECTION,
+                        ConfigValueFactory.fromAnyRef(
+                                
virtualTableDetail.getDatasourceProperties().get(COLLECTION)));
+        if (pluginType == PluginType.SOURCE) {
+            connectorConfig =
+                    connectorConfig.withValue(
+                            SCHEMA,
+                            
KafkaKingbaseDataSourceConfigSwitcher.SchemaGenerator
+                                    .generateSchemaBySelectTableFields(
+                                            virtualTableDetail, 
selectTableFields)
+                                    .root());
+        }
+
+        return super.mergeDatasourceConfig(
+                dataSourceInstanceConfig,
+                virtualTableDetail,
+                dataSourceOption,
+                selectTableFields,
+                businessMode,
+                pluginType,
+                connectorConfig);
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
 
b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
index 917c677b..d322d2e7 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
+++ 
b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
@@ -54,6 +54,10 @@ connector-datasource-mapper:
       dataSources:
         - Postgres-CDC
 
+    MongoDB:
+      dataSources:
+        - MongoDB
+
   sourceDatasourceFeatures:
     JDBC-Mysql:
       businessMode:
@@ -123,6 +127,13 @@ connector-datasource-mapper:
         - MULTIPLE_TABLE
         - SPLIT_TABLE
 
+    MongoDB:
+      businessMode:
+        - DATA_INTEGRATION
+      sceneMode:
+        - SINGLE_TABLE
+      jobMode:
+        - BATCH
 
 
   sinkDatasourceFeatures:
@@ -196,3 +207,11 @@ connector-datasource-mapper:
         - DATA_INTEGRATION
       sceneMode:
         - SINGLE_TABLE
+
+    MongoDB:
+      businessMode:
+        - DATA_INTEGRATION
+        - DATA_REPLICA
+      sceneMode:
+        - SINGLE_TABLE
+        - MULTIPLE_TABLE
\ No newline at end of file
diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml
index 7e83b1ac..f19e379a 100644
--- a/seatunnel-web-dist/pom.xml
+++ b/seatunnel-web-dist/pom.xml
@@ -350,6 +350,19 @@
                     </exclusions>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>datasource-mongodb</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>*</groupId>
+                            <artifactId>*</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+
                 <!-- jdbc driver -->
                 <dependency>
                     <groupId>com.oracle.database.jdbc</groupId>

Reply via email to