This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 4c6b067d [feature][datasource] Add Mongodb datasource
4c6b067d is described below
commit 4c6b067dfefa42a00e69bbb09f94736f371ad81d
Author: XiaoJiang521 <[email protected]>
AuthorDate: Tue Nov 7 14:45:29 2023 +0800
[feature][datasource] Add Mongodb datasource
---
.../classloader/DatasourceLoadConfig.java | 6 +-
.../datasource-mongodb/pom.xml | 64 ++++++++++++
.../plugin/mongodb/MongoDataSoueceChannel.java | 106 +++++++++++++++++++
.../plugin/mongodb/MongoDataSourceFactory.java | 57 ++++++++++
.../datasource/plugin/mongodb/MongoOptionRule.java | 51 +++++++++
.../plugin/mongodb/MongoRequestParamsUtils.java | 33 ++++++
.../datasource/plugin/api/DataSourceChannel.java | 14 ++-
.../seatunnel-datasource-plugins/pom.xml | 1 +
.../src/main/bin/download_datasource.sh | 1 +
.../impl/MongoDBDataSourceConfigSwitcher.java | 115 +++++++++++++++++++++
.../resources/connector-datasource-mapper.yaml | 19 ++++
seatunnel-web-dist/pom.xml | 13 +++
12 files changed, 477 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
index ef3dc0d7..a8b48078 100644
---
a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
+++
b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java
@@ -92,6 +92,8 @@ public class DatasourceLoadConfig {
classLoaderFactoryName.put(
"JDBC-STARROCKS",
"org.apache.seatunnel.datasource.plugin.starrocks.jdbc.StarRocksJdbcDataSourceFactory");
+ classLoaderFactoryName.put(
+ "MONGODB",
"com.apache.seatunnel.datasource.plugin.mongodb.MongoDataSourceFactory");
classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-");
classLoaderJarName.put("JDBC-CLICKHOUSE",
"datasource-jdbc-clickhouse-");
@@ -111,6 +113,7 @@ public class DatasourceLoadConfig {
classLoaderJarName.put("STARROCKS", "datasource-starrocks-");
classLoaderJarName.put("S3-REDSHIFT", "datasource-s3redshift-");
classLoaderJarName.put("JDBC-STARROCKS", "datasource-jdbc-starrocks-");
+ classLoaderJarName.put("MONGODB", "datasource-mongodb-");
}
public static final Set<String> pluginSet =
@@ -127,7 +130,8 @@ public class DatasourceLoadConfig {
"MySQL-CDC",
"S3",
"SqlServer-CDC",
- "StarRocks");
+ "StarRocks",
+ "MongoDB");
public static Map<String, DatasourceClassLoader> datasourceClassLoaders =
new HashMap<>();
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml
new file mode 100644
index 00000000..8bbf4dcf
--- /dev/null
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-datasource-plugins</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>datasource-mongodb</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>datasource-plugins-api</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-sync</artifactId>
+ <version>4.7.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <skip>${e2e.dependency.skip}</skip>
+ <appendOutput>true</appendOutput>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java
new file mode 100644
index 00000000..55ba487d
--- /dev/null
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSoueceChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
+import org.apache.seatunnel.datasource.plugin.api.model.TableField;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.ImmutableList;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoIterable;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Slf4j
+public class MongoDataSoueceChannel implements DataSourceChannel {
+
+ private static final String DATABASE = "default";
+
+ @Override
+ public OptionRule getDataSourceOptions(@NonNull String pluginName) {
+ return MongoOptionRule.optionRule();
+ }
+
+ @Override
+ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull
String pluginName) {
+ return MongoOptionRule.metadataRule();
+ }
+
+ public List<String> getTables(
+ @NonNull String pluginName,
+ Map<String, String> requestParams,
+ String database,
+ Map<String, String> options) {
+ checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE),
"database must be default");
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> getDatabases(
+ @NonNull String pluginName, @NonNull Map<String, String>
requestParams) {
+ return ImmutableList.of(DATABASE);
+ }
+
+ @Override
+ public List<TableField> getTableFields(
+ @NonNull String pluginName,
+ @NonNull Map<String, String> requestParams,
+ @NonNull String database,
+ @NonNull String table) {
+ checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE),
"database must be default");
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean checkDataSourceConnectivity(
+ @NonNull String pluginName, @NonNull Map<String, String>
requestParams) {
+
+ try (MongoClient mongoClient = createMongoClient(requestParams)) {
+ // Verify if the connection to mongodb was successful
+ MongoIterable<String> databaseNames =
mongoClient.listDatabaseNames();
+ if (databaseNames.iterator().hasNext()) {
+ log.info("mongoDB connection successful");
+ return true;
+ } else {
+ return false;
+ }
+ } catch (Exception e) {
+ throw new DataSourcePluginException(
+ "check MongoDB connectivity failed, " + e.getMessage(), e);
+ }
+ }
+
+ // Resolve the URI in requestParams of Map type
+ private MongoClient createMongoClient(Map<String, String> requestParams) {
+
+ return MongoClients.create(
+
MongoRequestParamsUtils.parseStringFromRequestParams(requestParams));
+ }
+}
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java
new file mode 100644
index 00000000..b08f5b30
--- /dev/null
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoDataSourceFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
+import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
+import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+@AutoService(DataSourceFactory.class)
+public class MongoDataSourceFactory implements DataSourceFactory {
+ public static final String MONGO_PLUGIN_NAME = "MongoDB";
+ public static final String MONGO_PLUGIN_ICON = "MongoDB";
+ public static final String MONGO_PLUGIN_VERSION = "1.0.0";
+
+ @Override
+ public String factoryIdentifier() {
+ return MONGO_PLUGIN_NAME;
+ }
+
+ @Override
+ public Set<DataSourcePluginInfo> supportedDataSources() {
+ return Sets.newHashSet(
+ DataSourcePluginInfo.builder()
+ .name(MONGO_PLUGIN_NAME)
+ .icon(MONGO_PLUGIN_ICON)
+ .version(MONGO_PLUGIN_VERSION)
+ .supportVirtualTables(true)
+ .type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
+ .build());
+ }
+
+ @Override
+ public DataSourceChannel createChannel() {
+ return new MongoDataSoueceChannel();
+ }
+}
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java
new file mode 100644
index 00000000..cde0f89f
--- /dev/null
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoOptionRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+
+public class MongoOptionRule {
+
+ public static final Option<String> URI =
+ Options.key("uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The MongoDB connection uri.");
+
+ public static final Option<String> DATABASE =
+ Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of MongoDB database to read or
write.");
+
+ public static final Option<String> COLLECTION =
+ Options.key("collection")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of MongoDB collection to read
or write.");
+
+ public static OptionRule optionRule() {
+ return OptionRule.builder().required(URI).build();
+ }
+
+ public static OptionRule metadataRule() {
+ return OptionRule.builder().required(DATABASE, COLLECTION).build();
+ }
+}
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java
new file mode 100644
index 00000000..eb342acf
--- /dev/null
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mongodb/src/main/java/com/apache/seatunnel/datasource/plugin/mongodb/MongoRequestParamsUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.seatunnel.datasource.plugin.mongodb;
+
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MongoRequestParamsUtils {
+
+ public static String parseStringFromRequestParams(Map<String, String>
requestParams) {
+ checkArgument(
+ requestParams.containsKey(MongoOptionRule.URI.key()),
+ String.format("Missing %s in requestParams",
MongoOptionRule.URI.key()));
+
+ return requestParams.get(MongoOptionRule.URI.key());
+ }
+}
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
index 72a2ea20..8fb645d4 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannel.java
@@ -28,6 +28,8 @@ import lombok.NonNull;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public interface DataSourceChannel {
@@ -71,11 +73,19 @@ public interface DataSourceChannel {
@NonNull String database,
@NonNull String table);
- Map<String, List<TableField>> getTableFields(
+ default Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
- @NonNull List<String> tables);
+ @NonNull List<String> tables) {
+ return tables.parallelStream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ table ->
+ getTableFields(
+ pluginName, requestParams,
database, table)));
+ }
/**
* just check metadata field is right and used by virtual table
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
index 5f2296bb..44c6ac6c 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
@@ -44,6 +44,7 @@
<module>datasource-s3</module>
<module>datasource-sqlserver-cdc</module>
<module>datasource-jdbc-tidb</module>
+ <module>datasource-mongodb</module>
</modules>
</project>
diff --git a/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
b/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
index 1c09af2f..3ea36ab6 100644
--- a/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
+++ b/seatunnel-server/seatunnel-app/src/main/bin/download_datasource.sh
@@ -43,6 +43,7 @@ datasource_list=(
"datasource-s3"
"datasource-sqlserver-cdc"
"datasource-starrocks"
+ "datasource-mongodb"
)
# the datasource default version is 1.0.0, you can also choose a custom
version. eg: 1.1.2: sh install-datasource.sh 2.1.2
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..2153c52e
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/MongoDBDataSourceConfigSwitcher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thirdparty.datasource.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import
org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import
org.apache.seatunnel.app.thirdparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@AutoService(DataSourceConfigSwitcher.class)
+@Slf4j
+public class MongoDBDataSourceConfigSwitcher extends
AbstractDataSourceConfigSwitcher {
+ private static final String DATABASE = "database";
+ private static final String COLLECTION = "collection";
+ private static final String SCHEMA = "schema";
+
+ @Override
+ public String getDataSourceName() {
+ return "MONGODB";
+ }
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<RequiredOption> addRequiredOptions,
+ List<Option<?>> addOptionalOptions,
+ List<String> excludedKeys) {
+ excludedKeys.add(SCHEMA);
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ addRequiredOptions,
+ addOptionalOptions,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ // Use field to generate the schema
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE,
+ ConfigValueFactory.fromAnyRef(
+
virtualTableDetail.getDatasourceProperties().get(DATABASE)));
+ connectorConfig =
+ connectorConfig.withValue(
+ COLLECTION,
+ ConfigValueFactory.fromAnyRef(
+
virtualTableDetail.getDatasourceProperties().get(COLLECTION)));
+ if (pluginType == PluginType.SOURCE) {
+ connectorConfig =
+ connectorConfig.withValue(
+ SCHEMA,
+
KafkaKingbaseDataSourceConfigSwitcher.SchemaGenerator
+ .generateSchemaBySelectTableFields(
+ virtualTableDetail,
selectTableFields)
+ .root());
+ }
+
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
index 917c677b..d322d2e7 100644
---
a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
+++
b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
@@ -54,6 +54,10 @@ connector-datasource-mapper:
dataSources:
- Postgres-CDC
+ MongoDB:
+ dataSources:
+ - MongoDB
+
sourceDatasourceFeatures:
JDBC-Mysql:
businessMode:
@@ -123,6 +127,13 @@ connector-datasource-mapper:
- MULTIPLE_TABLE
- SPLIT_TABLE
+ MongoDB:
+ businessMode:
+ - DATA_INTEGRATION
+ sceneMode:
+ - SINGLE_TABLE
+ jobMode:
+ - BATCH
sinkDatasourceFeatures:
@@ -196,3 +207,11 @@ connector-datasource-mapper:
- DATA_INTEGRATION
sceneMode:
- SINGLE_TABLE
+
+ MongoDB:
+ businessMode:
+ - DATA_INTEGRATION
+ - DATA_REPLICA
+ sceneMode:
+ - SINGLE_TABLE
+ - MULTIPLE_TABLE
\ No newline at end of file
diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml
index 7e83b1ac..f19e379a 100644
--- a/seatunnel-web-dist/pom.xml
+++ b/seatunnel-web-dist/pom.xml
@@ -350,6 +350,19 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>datasource-mongodb</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- jdbc driver -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>