This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ffd5778ef [Feature][Connector V2] expose configurable options in 
MongoDB (#3347)
ffd5778ef is described below

commit ffd5778efc75e6b055188869c78e5b3d5a9a20d8
Author: Eric <[email protected]>
AuthorDate: Thu Nov 10 16:42:15 2022 +0800

    [Feature][Connector V2] expose configurable options in MongoDB (#3347)
    
    * add mongodb config options
    
    add mongodb config options
    
    * add mongodb option rules
---
 docs/en/connector-v2/sink/MongoDB.md               |  4 +--
 docs/en/connector-v2/source/MongoDB.md             |  6 +++-
 .../seatunnel/mongodb/config/MongodbConfig.java    | 31 +++++++++++-----
 .../seatunnel/mongodb/sink/MongodbSink.java        |  2 +-
 .../seatunnel/mongodb/sink/MongodbSinkFactory.java | 41 +++++++++++++++++++++
 .../seatunnel/mongodb/source/MongodbSource.java    |  7 ++--
 .../mongodb/source/MongodbSourceFactory.java       | 42 ++++++++++++++++++++++
 7 files changed, 117 insertions(+), 16 deletions(-)

diff --git a/docs/en/connector-v2/sink/MongoDB.md 
b/docs/en/connector-v2/sink/MongoDB.md
index 2b94e4804..b21b39c8e 100644
--- a/docs/en/connector-v2/sink/MongoDB.md
+++ b/docs/en/connector-v2/sink/MongoDB.md
@@ -18,11 +18,11 @@ Write data to `MongoDB`
 ## Options
 
 | name           | type   | required | default value |
-|--------------- | ------ |----------| ------------- |
+|--------------- |--------|----------| ------------- |
 | uri            | string | yes      | -             |
 | database       | string | yes      | -             |
 | collection     | string | yes      | -             |
-| common-options |        | no       | -             |
+| common-options | config | no       | -             |
 
 ### uri [string]
 
diff --git a/docs/en/connector-v2/source/MongoDB.md 
b/docs/en/connector-v2/source/MongoDB.md
index 2e36b21ab..c6c3f43ba 100644
--- a/docs/en/connector-v2/source/MongoDB.md
+++ b/docs/en/connector-v2/source/MongoDB.md
@@ -23,7 +23,7 @@ Read data from MongoDB.
 | database       | string | yes      | -             |
 | collection     | string | yes      | -             |
 | schema         | object | yes      | -             |
-| common-options |        | yes      | -             |
+| common-options | config | no       | -             |
 
 ### uri [string]
 
@@ -82,3 +82,7 @@ mongodb {
 ### 2.2.0-beta 2022-09-26
 
 - Add MongoDB Source Connector
+
+### Next Version
+
+- common-options is not a required option
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
index 45857e85b..d38f5134b 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import java.io.Serializable;
 
 /**
@@ -24,16 +27,28 @@ import java.io.Serializable;
  */
 public class MongodbConfig implements Serializable {
 
-    public static final String URI = "uri";
-
-    public static final String DATABASE = "database";
-
-    public static final String COLLECTION = "collection";
-
-    public static final String SCHEMA = "schema";
-
+    public static final Option<String> URI =
+        Options.key("uri")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("MongoDB uri");
+
+    public static final Option<String> DATABASE =
+        Options.key("database")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("MongoDB database name");
+
+    public static final Option<String> COLLECTION =
+        Options.key("collection")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("MongoDB collection");
+
+    // Don't use now
     public static final String FORMAT = "format";
 
+    // Don't use now
     public static final String DEFAULT_FORMAT = "json";
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
index 22c3cce87..13f136aaa 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.java
@@ -56,7 +56,7 @@ public class MongodbSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, URI, 
DATABASE, COLLECTION);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URI.key(), 
DATABASE.key(), COLLECTION.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
new file mode 100644
index 000000000..af1c365e9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSinkFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MongodbSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "MongoDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(URI, DATABASE, 
COLLECTION).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
index 247594e08..6b2422830 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.mongodb.source;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
@@ -56,15 +55,15 @@ public class MongodbSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, URI, 
DATABASE, COLLECTION);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, URI.key(), 
DATABASE.key(), COLLECTION.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
 
         this.params = ConfigBeanFactory.create(config, 
MongodbParameters.class);
 
-        if (config.hasPath(SCHEMA)) {
-            Config schema = config.getConfig(SCHEMA);
+        if (config.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+            Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
             this.rowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
         } else {
             this.rowType = SeaTunnelSchema.buildSimpleTextSchema();
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
new file mode 100644
index 000000000..77449bbdd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MongodbSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "MongoDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(URI, DATABASE, COLLECTION, 
SeaTunnelSchema.SCHEMA).build();
+    }
+}

Reply via email to