This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38254e3f2 [Connector-V2] [ElasticSearch] Add ElasticSearch Source/Sink 
Factory (#3325)
38254e3f2 is described below

commit 38254e3f26fe38eeecdedd166e56cf25a43c73f2
Author: Hisoka <[email protected]>
AuthorDate: Wed Nov 9 11:09:26 2022 +0800

    [Connector-V2] [ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)
    
    * [Connector-V2] [ElasticSearch] Add ElasticSearch Source/Sink Factory
    
    * [Connector-V2] [ES] Fix doc error
    
    * Update 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
    
    Co-authored-by: Zongwen Li <[email protected]>
    
    * Update 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
    
    Co-authored-by: Zongwen Li <[email protected]>
    
    Co-authored-by: Zongwen Li <[email protected]>
---
 docs/en/connector-v2/sink/Elasticsearch.md         | 22 +++++-----
 docs/en/connector-v2/source/Elasticsearch.md       | 10 ++++-
 .../seatunnel/common/schema/Schema.java}           | 14 +++++--
 .../seatunnel/common/schema/SeaTunnelSchema.java   |  5 ++-
 .../elasticsearch/client/EsRestClient.java         | 10 ++---
 .../config/EsClusterConnectionConfig.java          | 14 +++++--
 .../seatunnel/elasticsearch/config/SinkConfig.java | 32 ++++++---------
 .../elasticsearch/config/SourceConfig.java         | 40 ++++++++++++++++++
 .../elasticsearch/config/source/SourceConfig.java  | 30 --------------
 .../elasticsearch/constant/BulkConfig.java         | 39 ------------------
 .../seatunnel/elasticsearch/dto/IndexInfo.java     |  6 +--
 .../elasticsearch/sink/ElasticsearchSink.java      | 17 ++++++--
 .../sink/ElasticsearchSinkFactory.java             | 46 +++++++++++++++++++++
 .../sink/ElasticsearchSinkWriter.java              | 26 +++++++-----
 .../elasticsearch/source/ElasticsearchSource.java  | 10 ++---
 .../source/ElasticsearchSourceFactory.java         | 47 ++++++++++++++++++++++
 .../source/ElasticsearchSourceSplitEnumerator.java | 21 +++++-----
 .../FakeDataGeneratorTest.java                     |  2 +-
 .../file/hdfs/source/BaseHdfsFileSource.java       |  2 +-
 .../seatunnel/file/ftp/source/FtpFileSource.java   |  4 +-
 .../file/local/source/LocalFileSource.java         |  4 +-
 .../seatunnel/file/oss/source/OssFileSource.java   |  2 +-
 .../seatunnel/file/s3/source/S3FileSource.java     |  4 +-
 .../seatunnel/file/sftp/source/SftpFileSource.java |  4 +-
 .../seatunnel/neo4j/source/Neo4jSource.java        |  4 +-
 .../seatunnel/pulsar/source/PulsarSource.java      |  4 +-
 .../seatunnel/redis/source/RedisSource.java        |  4 +-
 27 files changed, 259 insertions(+), 164 deletions(-)

diff --git a/docs/en/connector-v2/sink/Elasticsearch.md 
b/docs/en/connector-v2/sink/Elasticsearch.md
index d8673b90d..d53dca003 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -19,16 +19,16 @@ Engine Supported
 
 ## Options
 
-| name           | type   | required | default value | 
-|----------------|--------|----------|---------------|
-| hosts          | array  | yes      | -             |
-| index          | string | yes      | -             |
-| index_type     | string | no       |               |
-| username       | string | no       |               |
-| password       | string | no       |               | 
-| max_retry_size | int    | no       | 3             |
-| max_batch_size | int    | no       | 10            |
-| common-options |        | no       | -             |
+| name            | type   | required | default value | 
+|-----------------|--------|----------|---------------|
+| hosts           | array  | yes      | -             |
+| index           | string | yes      | -             |
+| index_type      | string | no       |               |
+| username        | string | no       |               |
+| password        | string | no       |               | 
+| max_retry_count | int    | no       | 3             |
+| max_batch_size  | int    | no       | 10            |
+| common-options  |        | no       | -             |
 
 
 ### hosts [array]
@@ -47,7 +47,7 @@ x-pack username
 ### password [string]
 x-pack password
 
-### max_retry_size [int]
+### max_retry_count [int]
 one bulk request max try size
 
 ### max_batch_size [int]
diff --git a/docs/en/connector-v2/source/Elasticsearch.md 
b/docs/en/connector-v2/source/Elasticsearch.md
index f10f16eb0..8b3bcb644 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -20,14 +20,15 @@ support version >= 2.x and < 8.x.
 ## Options
 
 | name        | type   | required | default value | 
-|-------------|--------| -------- |---------------|
+|-------------|--------|----------|---------------|
 | hosts       | array  | yes      | -             |
 | username    | string | no       | -             |
 | password    | string | no       | -             |
 | index       | string | yes      | -             |
-| source      | array  | yes      | -             |
+| source      | array  | no       | -             |
 | scroll_time | string | no       | 1m            |
 | scroll_size | int    | no       | 100           |
+| schema      |        | no       | -             |
 
 
 
@@ -46,6 +47,7 @@ Elasticsearch index name, support * fuzzy matching.
 ### source [array]
 The fields of index.
 You can get the document id by specifying the field `_id`.If sink _id to other 
index,you need specify an alias for _id due to the Elasticsearch limit.
+If you don't config source, you must config `schema`.
 
 ### scroll_time [String]
 Amount of time Elasticsearch will keep the search context alive for scroll 
requests.
@@ -53,6 +55,10 @@ Amount of time Elasticsearch will keep the search context 
alive for scroll reque
 ### scroll_size [int]
 Maximum number of hits to be returned with each Elasticsearch scroll request.
 
+### schema
+The structure of the data, including field names and field types.
+If you don't config schema, you must config `source`.
+
 ## Examples
 simple
 ```hocon
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/Schema.java
similarity index 73%
rename from 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java
rename to 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/Schema.java
index 035b556b6..b0614dc73 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/Schema.java
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;
+package org.apache.seatunnel.connectors.seatunnel.common.schema;
 
-public class SourceConfigDeaultConstant {
+import org.apache.seatunnel.api.configuration.util.OptionMark;
 
-    public static final String SCROLLL_TIME = "1m";
+import lombok.Data;
 
-    public static final int SCROLLL_SIZE = 100;
+import java.util.Map;
+
+@Data
+public class Schema {
+
+    @OptionMark(description = "The schema fields map")
+    private Map<String, String> fields;
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
index 144a02c24..10b73c7c4 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.common.schema;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -42,7 +44,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 public class SeaTunnelSchema implements Serializable {
-    public static final String SCHEMA = "schema";
+
+    public static final Option<Schema> SCHEMA = 
Options.key("schema").objectType(Schema.class).noDefaultValue().withDescription("SeaTunnel
 Schema");
     private static final String FIELD_KEY = "fields";
     private static final String SIMPLE_SCHEMA_FILED = "content";
     private final SeaTunnelRowType seaTunnelRowType;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 809654b24..e8c21f8f1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -71,13 +71,13 @@ public class EsRestClient {
     }
 
     public static EsRestClient createInstance(Config pluginConfig) {
-        List<String> hosts = 
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS);
+        List<String> hosts = 
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS.key());
         String username = null;
         String password = null;
-        if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) {
-            username = 
pluginConfig.getString(EsClusterConnectionConfig.USERNAME);
-            if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) {
-                password = 
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD);
+        if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME.key())) {
+            username = 
pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key());
+            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD.key())) {
+                password = 
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key());
             }
         }
         return createInstance(hosts, username, password);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
index fd482db5e..5fee80878 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
@@ -17,12 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
 public class EsClusterConnectionConfig {
 
-    public static final String HOSTS = "hosts";
+    public static final Option<List<String>> HOSTS = 
Options.key("hosts").listType().noDefaultValue()
+        .withDescription("Elasticsearch cluster http address, the format is 
host:port, allowing multiple hosts to be specified. Such as [\"host1:9200\", 
\"host2:9200\"]");
 
-    public static final String USERNAME = "username";
+    public static final Option<String> USERNAME = 
Options.key("username").stringType().noDefaultValue()
+        .withDescription("x-pack username");
 
-    public static final String PASSWORD = "password";
+    public static final Option<String> PASSWORD = 
Options.key("password").stringType().noDefaultValue()
+        .withDescription("x-pack password");
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index 6dc753bb2..e819e618c 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -17,31 +17,23 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 
 public class SinkConfig {
 
-    public static final String INDEX = "index";
+    public static final Option<String> INDEX = 
Options.key("index").stringType().noDefaultValue()
+        .withDescription("Elasticsearch index name.Index support contains 
variables of field name,such as seatunnel_${age},and the field must appear at 
seatunnel row. If not, we will treat it as a normal index");
 
-    public static final String INDEX_TYPE = "index_type";
+    public static final Option<String> INDEX_TYPE = 
Options.key("index_type").stringType().noDefaultValue()
+        .withDescription("Elasticsearch index type, it is recommended not to 
specify in elasticsearch 6 and above");
 
-    public static final String USERNAME = "username";
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> MAX_BATCH_SIZE = 
Options.key("max_batch_size").intType().defaultValue(10)
+        .withDescription("batch bulk doc max size");
 
-    public static final String PASSWORD = "password";
-
-    public static final String HOSTS = "hosts";
-
-    public static final String MAX_BATCH_SIZE = "max_batch_size";
-
-    public static final String MAX_RETRY_SIZE = "max_retry_size";
-
-    public static void 
setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
-        if (pluginConfig.hasPath(MAX_BATCH_SIZE)) {
-            BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
-        }
-        if (pluginConfig.hasPath(MAX_RETRY_SIZE)) {
-            BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
-        }
-    }
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> MAX_RETRY_COUNT = 
Options.key("max_retry_count").intType().defaultValue(3)
+        .withDescription("one bulk request max try count");
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
new file mode 100644
index 000000000..11a1f6576
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class SourceConfig {
+
+    public static final Option<String> INDEX = 
Options.key("index").stringType().noDefaultValue()
+        .withDescription("Elasticsearch index name, support * fuzzy matching");
+
+    public static final Option<List<String>> SOURCE = 
Options.key("source").listType().noDefaultValue()
+        .withDescription("The fields of index. You can get the document id by 
specifying the field _id.If sink _id to other index,you need specify an alias 
for _id due to the Elasticsearch limit");
+
+    public static final Option<String> SCROLL_TIME = 
Options.key("scroll_time").stringType().defaultValue("1m")
+        .withDescription("Amount of time Elasticsearch will keep the search 
context alive for scroll requests");
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static final Option<Integer> SCROLL_SIZE = 
Options.key("scroll_size").intType().defaultValue(100)
+        .withDescription("Maximum number of hits to be returned with each 
Elasticsearch scroll request");
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java
deleted file mode 100644
index 804080307..000000000
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;
-
-public class SourceConfig {
-
-    public static final String INDEX = "index";
-
-    public static final String SOURCE = "source";
-
-    public static final String SCROLL_TIME = "scroll_time";
-
-    public static final String SCROLL_SIZE = "scroll_size";
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
deleted file mode 100644
index dba8b8dd1..000000000
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
-
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
-
-/**
- * bulk es config
- */
-public class BulkConfig {
-    /**
-     * once bulk es include max document size
-     * {@link SinkConfig#MAX_BATCH_SIZE}
-     */
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static int MAX_BATCH_SIZE = 10;
-
-    /**
-     * the max retry size of bulk es
-     * {@link SinkConfig#MAX_RETRY_SIZE}
-     */
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static int MAX_RETRY_SIZE = 3;
-}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index dad62cf74..73d5da08b 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -33,9 +33,9 @@ public class IndexInfo {
     private String type;
 
     public IndexInfo(Config pluginConfig) {
-        index = pluginConfig.getString(SinkConfig.INDEX);
-        if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE)) {
-            type = pluginConfig.getString(SinkConfig.INDEX_TYPE);
+        index = pluginConfig.getString(SinkConfig.INDEX.key());
+        if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE.key())) {
+            type = pluginConfig.getString(SinkConfig.INDEX_TYPE.key());
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index 3924c7199..48b7d2ed1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -17,13 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
+
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
@@ -41,6 +43,10 @@ public class ElasticsearchSink implements 
SeaTunnelSink<SeaTunnelRow, Elasticsea
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
 
+    private int maxBatchSize = MAX_BATCH_SIZE.defaultValue();
+
+    private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();
+
     @Override
     public String getPluginName() {
         return "Elasticsearch";
@@ -49,7 +55,12 @@ public class ElasticsearchSink implements 
SeaTunnelSink<SeaTunnelRow, Elasticsea
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.pluginConfig = pluginConfig;
-        SinkConfig.setValue(pluginConfig);
+        if (pluginConfig.hasPath(MAX_BATCH_SIZE.key())) {
+            maxBatchSize = pluginConfig.getInt(MAX_BATCH_SIZE.key());
+        }
+        if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
+            maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
+        }
     }
 
     @Override
@@ -64,7 +75,7 @@ public class ElasticsearchSink implements 
SeaTunnelSink<SeaTunnelRow, Elasticsea
 
     @Override
     public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState> createWriter(SinkWriter.Context context) {
-        return new ElasticsearchSinkWriter(context, seaTunnelRowType, 
pluginConfig, Collections.emptyList());
+        return new ElasticsearchSinkWriter(context, seaTunnelRowType, 
pluginConfig, maxBatchSize, maxRetryCount, Collections.emptyList());
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
new file mode 100644
index 000000000..5f5778e76
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ElasticsearchSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Elasticsearch";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(HOSTS, INDEX)
+            .optional(INDEX_TYPE, USERNAME, PASSWORD, MAX_RETRY_COUNT, 
MAX_BATCH_SIZE).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 1bc2ef55f..682afec58 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
@@ -40,39 +39,46 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
-@Slf4j
 /**
  * ElasticsearchSinkWriter is a sink writer that will write {@link 
SeaTunnelRow} to Elasticsearch.
  */
+@Slf4j
 public class ElasticsearchSinkWriter implements SinkWriter<SeaTunnelRow, 
ElasticsearchCommitInfo, ElasticsearchSinkState> {
 
     private final SinkWriter.Context context;
 
+    private final int maxBatchSize;
+
+    private final int maxRetryCount;
+
     private final SeaTunnelRowSerializer seaTunnelRowSerializer;
     private final List<String> requestEsList;
     private EsRestClient esRestClient;
 
     public ElasticsearchSinkWriter(
-            SinkWriter.Context context,
-            SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig,
-            List<ElasticsearchSinkState> elasticsearchStates) {
+        SinkWriter.Context context,
+        SeaTunnelRowType seaTunnelRowType,
+        Config pluginConfig,
+        int maxBatchSize, int maxRetryCount,
+        List<ElasticsearchSinkState> elasticsearchStates) {
         this.context = context;
+        this.maxBatchSize = maxBatchSize;
+        this.maxRetryCount = maxRetryCount;
 
         IndexInfo indexInfo = new IndexInfo(pluginConfig);
         esRestClient = EsRestClient.createInstance(pluginConfig);
         ElasticsearchVersion elasticsearchVersion = 
ElasticsearchVersion.get(esRestClient.getClusterVersion());
         this.seaTunnelRowSerializer = new 
ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
 
-        this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE);
+        this.requestEsList = new ArrayList<>(maxBatchSize);
     }
 
     @Override
     public void write(SeaTunnelRow element) {
         String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
         requestEsList.add(indexRequestRow);
-        if (requestEsList.size() >= BulkConfig.MAX_BATCH_SIZE) {
-            bulkEsWithRetry(this.esRestClient, this.requestEsList, 
BulkConfig.MAX_RETRY_SIZE);
+        if (requestEsList.size() >= maxBatchSize) {
+            bulkEsWithRetry(this.esRestClient, this.requestEsList, 
maxRetryCount);
             requestEsList.clear();
         }
     }
@@ -110,7 +116,7 @@ public class ElasticsearchSinkWriter implements 
SinkWriter<SeaTunnelRow, Elastic
 
     @Override
     public void close() throws IOException {
-        bulkEsWithRetry(this.esRestClient, this.requestEsList, 
BulkConfig.MAX_RETRY_SIZE);
+        bulkEsWithRetry(this.esRestClient, this.requestEsList, maxRetryCount);
         esRestClient.close();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index 674bc2da4..9339828c8 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.EsTypeMappingSeaTunnelType;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -56,14 +56,14 @@ public class ElasticsearchSource implements 
SeaTunnelSource<SeaTunnelRow, Elasti
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.pluginConfig = pluginConfig;
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
-            Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+            Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
             rowTypeInfo = 
SeaTunnelSchema.buildWithConfig(schemaConfig).getSeaTunnelRowType();
             source = Arrays.asList(rowTypeInfo.getFieldNames());
         } else {
-            source = pluginConfig.getStringList(SourceConfig.SOURCE);
+            source = pluginConfig.getStringList(SourceConfig.SOURCE.key());
             EsRestClient esRestClient = 
EsRestClient.createInstance(this.pluginConfig);
-            Map<String, String> esFieldType = 
esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX), 
source);
+            Map<String, String> esFieldType = 
esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX.key()),
 source);
             esRestClient.close();
             SeaTunnelDataType[] fieldTypes = new 
SeaTunnelDataType[source.size()];
             for (int i = 0; i < source.size(); i++) {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
new file mode 100644
index 000000000..1f00a5b21
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ElasticsearchSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Elasticsearch";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().required(HOSTS, INDEX).optional(USERNAME, 
PASSWORD, SCROLL_TIME, SCROLL_SIZE)
+            .exclusive(SOURCE, SeaTunnelSchema.SCHEMA).build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index 2ba7c58ac..fafd749fd 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -19,8 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfigDeaultConstant;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;
 
@@ -131,20 +130,20 @@ public class ElasticsearchSourceSplitEnumerator 
implements SourceSplitEnumerator
 
     private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
         List<ElasticsearchSourceSplit> splits = new ArrayList<>();
-        String scrolllTime = SourceConfigDeaultConstant.SCROLLL_TIME;
-        if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME)) {
-            scrolllTime = pluginConfig.getString(SourceConfig.SCROLL_TIME);
+        String scrollTime = SourceConfig.SCROLL_TIME.defaultValue();
+        if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME.key())) {
+            scrollTime = 
pluginConfig.getString(SourceConfig.SCROLL_TIME.key());
         }
-        int scrollSize = SourceConfigDeaultConstant.SCROLLL_SIZE;
-        if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE)) {
-            scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE);
+        int scrollSize = SourceConfig.SCROLL_SIZE.defaultValue();
+        if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
+            scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
         }
 
-        List<IndexDocsCount> indexDocsCounts = 
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX));
+        List<IndexDocsCount> indexDocsCounts = 
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
         indexDocsCounts = indexDocsCounts.stream().filter(x -> 
x.getDocsCount() != null && x.getDocsCount() > 0)
-                
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList());
+            
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList());
         for (IndexDocsCount indexDocsCount : indexDocsCounts) {
-            splits.add(new 
ElasticsearchSourceSplit(String.valueOf(indexDocsCount.getIndex().hashCode()), 
new SourceIndexInfo(indexDocsCount.getIndex(), source, scrolllTime, 
scrollSize)));
+            splits.add(new 
ElasticsearchSourceSplit(String.valueOf(indexDocsCount.getIndex().hashCode()), 
new SourceIndexInfo(indexDocsCount.getIndex(), source, scrollTime, 
scrollSize)));
         }
         return splits;
     }
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index 15cf1592e..6e58e30ba 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -43,7 +43,7 @@ public class FakeDataGeneratorTest {
     @ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
     public void testComplexSchemaParse(String conf) throws 
FileNotFoundException, URISyntaxException {
         Config testConfig = getTestConfigFile(conf);
-        SeaTunnelSchema seaTunnelSchema = 
SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA));
+        SeaTunnelSchema seaTunnelSchema = 
SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA.key()));
         SeaTunnelRowType seaTunnelRowType = 
seaTunnelSchema.getSeaTunnelRowType();
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
         FakeDataGenerator fakeDataGenerator = new 
FakeDataGenerator(seaTunnelSchema, fakeConfig);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 311a6dd04..161c3a921 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -59,7 +59,7 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
                 case CSV:
                 case TEXT:
                 case JSON:
-                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
                     SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index ab34a6c2b..abcb511d5 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -68,12 +68,12 @@ public class FtpFileSource extends BaseFileSource {
         }
         // support user-defined schema
         // only json type support user-defined schema now
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
             switch (fileFormat) {
                 case CSV:
                 case TEXT:
                 case JSON:
-                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
                     SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index df04acde4..6cb4f9fa3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -65,12 +65,12 @@ public class LocalFileSource extends BaseFileSource {
         // support user-defined schema
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE).toUpperCase());
         // only json text csv type support user-defined schema now
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
             switch (fileFormat) {
                 case CSV:
                 case TEXT:
                 case JSON:
-                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
                     SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index a99d0e2ab..ec3926d4f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -71,7 +71,7 @@ public class OssFileSource extends BaseFileSource {
                 case CSV:
                 case TEXT:
                 case JSON:
-                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
                     SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index d77c3d4e5..76d584c89 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -65,12 +65,12 @@ public class S3FileSource extends BaseFileSource {
         // support user-defined schema
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE).toUpperCase());
         // only json text csv type support user-defined schema now
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
             switch (fileFormat) {
                 case CSV:
                 case TEXT:
                 case JSON:
-                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
                     SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index 7bf20ebeb..39c27739d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -68,12 +68,12 @@ public class SftpFileSource extends BaseFileSource {
         }
         // support user-defined schema
         // only json csv text type support user-defined schema now
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
             switch (fileFormat) {
                 case CSV:
                 case TEXT:
                 case JSON:
-                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    Config schemaConfig = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
                     SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
                             .buildWithConfig(schemaConfig)
                             .getSeaTunnelRowType();
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index dc52b91ad..26c9e524e 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -66,13 +66,13 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     public void prepare(Config pluginConfig) throws PrepareFailException {
         neo4jSourceConfig.setDriverBuilder(prepareDriver(pluginConfig));
 
-        final CheckResult configCheck = 
CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY, SeaTunnelSchema.SCHEMA);
+        final CheckResult configCheck = 
CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY, 
SeaTunnelSchema.SCHEMA.key());
         if (!configCheck.isSuccess()) {
             throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, 
PluginType.SOURCE, configCheck.getMsg());
         }
         neo4jSourceConfig.setQuery(pluginConfig.getString(KEY_QUERY));
 
-        this.rowType = 
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(SeaTunnelSchema.SCHEMA)).getSeaTunnelRowType();
+        this.rowType = 
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index d0c481569..a52d63b2b 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -152,7 +152,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T, 
PulsarPartitionSplit,
         setPartitionDiscoverer(config);
         setDeserialization(config);
 
-        if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
+        if (partitionDiscoverer instanceof TopicPatternDiscoverer
             && partitionDiscoveryIntervalMs > 0
             && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
             throw new IllegalArgumentException("Bounded streams do not support 
dynamic partition discovery.");
@@ -226,7 +226,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T, 
PulsarPartitionSplit,
     private void setDeserialization(Config config) {
         String format = config.getString("format");
         // TODO: format SPI
-        SeaTunnelRowType rowType = 
SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA)).getSeaTunnelRowType();
+        SeaTunnelRowType rowType = 
SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
         deserialization = (DeserializationSchema<T>) new 
JsonDeserializationSchema(false, false, rowType);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 407d32222..2fe258d26 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -57,8 +57,8 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
         this.redisParameters.buildWithConfig(pluginConfig);
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
-            Config schema = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+            Config schema = 
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
             this.seaTunnelRowType = 
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
         } else {
             this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();

Reply via email to