This is an automated email from the ASF dual-hosted git repository.

jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d307ab44f2 [improve] add Elasticsearch options (#8623)
d307ab44f2 is described below

commit d307ab44f2c3e5f0640a1b30fa8642d460f20dbc
Author: fcb-xiaobo <[email protected]>
AuthorDate: Fri Feb 7 22:14:19 2025 +0800

    [improve] add Elasticsearch options (#8623)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 .../elasticsearch/client/EsRestClient.java         | 20 +++----
 ...onConfig.java => ElasticsearchBaseOptions.java} |  9 ++-
 .../elasticsearch/config/ElasticsearchConfig.java  | 53 +++++++++++++++++
 ...nkConfig.java => ElasticsearchSinkOptions.java} |  9 +--
 ...Config.java => ElasticsearchSourceOptions.java} | 30 +---------
 .../seatunnel/elasticsearch/dto/IndexInfo.java     | 10 ++--
 .../elasticsearch/sink/ElasticsearchSink.java      | 10 ++--
 .../sink/ElasticsearchSinkFactory.java             | 38 +++++++------
 .../elasticsearch/source/ElasticsearchSource.java  | 66 +++++++++++-----------
 .../source/ElasticsearchSourceFactory.java         | 28 ++++-----
 .../source/ElasticsearchSourceReader.java          | 10 ++--
 .../source/ElasticsearchSourceSplit.java           |  6 +-
 .../source/ElasticsearchSourceSplitEnumerator.java | 18 +++---
 .../serialize/ElasticsearchRowSerializerTest.java  | 16 +++---
 15 files changed, 177 insertions(+), 148 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index a473f0f685..92acb42a5f 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -169,7 +169,6 @@ public class ConnectorOptionCheckTest {
 
     private Set<String> buildWhiteList() {
         Set<String> whiteList = new HashSet<>();
-        whiteList.add("ElasticsearchSourceOptions");
         whiteList.add("JdbcSinkOptions");
         whiteList.add("TypesenseSourceOptions");
         whiteList.add("RabbitmqSourceOptions");
@@ -202,7 +201,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("SentrySinkOptions");
         whiteList.add("EasysearchSinkOptions");
         whiteList.add("QdrantSinkOptions");
-        whiteList.add("ElasticsearchSinkOptions");
         whiteList.add("MilvusSourceOptions");
         whiteList.add("RocketMqSinkOptions");
         whiteList.add("ClickhouseFileSinkOptions");
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 6b698531c4..ab4c35b1a6 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -26,7 +26,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.common.utils.JsonUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
@@ -92,23 +92,23 @@ public class EsRestClient implements Closeable {
     }
 
     public static EsRestClient createInstance(ReadonlyConfig config) {
-        List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
-        Optional<String> username = 
config.getOptional(EsClusterConnectionConfig.USERNAME);
-        Optional<String> password = 
config.getOptional(EsClusterConnectionConfig.PASSWORD);
+        List<String> hosts = config.get(ElasticsearchBaseOptions.HOSTS);
+        Optional<String> username = 
config.getOptional(ElasticsearchBaseOptions.USERNAME);
+        Optional<String> password = 
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
         Optional<String> keystorePath = Optional.empty();
         Optional<String> keystorePassword = Optional.empty();
         Optional<String> truststorePath = Optional.empty();
         Optional<String> truststorePassword = Optional.empty();
-        boolean tlsVerifyCertificate = 
config.get(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE);
+        boolean tlsVerifyCertificate = 
config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
         if (tlsVerifyCertificate) {
-            keystorePath = 
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PATH);
-            keystorePassword = 
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD);
-            truststorePath = 
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH);
+            keystorePath = 
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
+            keystorePassword = 
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
+            truststorePath = 
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
             truststorePassword =
-                    
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD);
+                    
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
         }
 
-        boolean tlsVerifyHostnames = 
config.get(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME);
+        boolean tlsVerifyHostnames = 
config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
         return createInstance(
                 hosts,
                 username,
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
similarity index 87%
rename from 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
rename to 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
index c544807fbf..c5e688443e 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
@@ -20,9 +20,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.io.Serializable;
 import java.util.List;
 
-public class EsClusterConnectionConfig {
+public class ElasticsearchBaseOptions implements Serializable {
 
     public static final Option<List<String>> HOSTS =
             Options.key("hosts")
@@ -37,6 +38,12 @@ public class EsClusterConnectionConfig {
                     .noDefaultValue()
                     .withDescription("x-pack username");
 
+    public static final Option<String> INDEX =
+            Options.key("index")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Elasticsearch index name.Index support contains 
variables of field name,such as seatunnel_${age},and the field must appear at 
seatunnel row. If not, we will treat it as a normal index");
     public static final Option<String> PASSWORD =
             Options.key("password")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
new file mode 100644
index 0000000000..2ff64ee743
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+public class ElasticsearchConfig implements Serializable {
+
+    private String index;
+    private List<String> source;
+    private Map<String, Object> query;
+    private String scrollTime;
+    private int scrollSize;
+
+    private CatalogTable catalogTable;
+
+    public ElasticsearchConfig clone() {
+        ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
+        elasticsearchConfig.setIndex(index);
+        elasticsearchConfig.setSource(new ArrayList<>(source));
+        elasticsearchConfig.setQuery(new HashMap<>(query));
+        elasticsearchConfig.setScrollTime(scrollTime);
+        elasticsearchConfig.setScrollSize(scrollSize);
+        elasticsearchConfig.setCatalogTable(catalogTable);
+        return elasticsearchConfig;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
similarity index 88%
rename from 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
rename to 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
index fdb0300aab..22ab5e0d51 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -29,14 +29,7 @@ import static 
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
 import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
 import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
 
-public class SinkConfig {
-
-    public static final Option<String> INDEX =
-            Options.key("index")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Elasticsearch index name.Index support contains 
variables of field name,such as seatunnel_${age},and the field must appear at 
seatunnel row. If not, we will treat it as a normal index");
+public class ElasticsearchSinkOptions extends ElasticsearchBaseOptions {
 
     public static final Option<String> INDEX_TYPE =
             Options.key("index_type")
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
similarity index 76%
rename from 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
rename to 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
index ffeb69d67f..a0cee3010e 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
@@ -21,13 +21,10 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
 
 import lombok.Getter;
 import lombok.Setter;
 
-import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,7 +32,7 @@ import java.util.Map;
 
 @Getter
 @Setter
-public class SourceConfig implements Serializable {
+public class ElasticsearchSourceOptions extends ElasticsearchBaseOptions {
 
     public static final Option<List<Map<String, Object>>> INDEX_LIST =
             Options.key("index_list")
@@ -43,12 +40,6 @@ public class SourceConfig implements Serializable {
                     .noDefaultValue()
                     .withDescription("index_list for multiTable sync");
 
-    public static final Option<String> INDEX =
-            Options.key("index")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Elasticsearch index name, support * 
fuzzy matching");
-
     public static final Option<List<String>> SOURCE =
             Options.key("source")
                     .listType()
@@ -84,23 +75,4 @@ public class SourceConfig implements Serializable {
                             Collections.singletonMap("match_all", new 
HashMap<String, String>()))
                     .withDescription(
                             "Elasticsearch query language. You can control the 
range of data read");
-
-    private String index;
-    private List<String> source;
-    private Map<String, Object> query;
-    private String scrollTime;
-    private int scrollSize;
-
-    private CatalogTable catalogTable;
-
-    public SourceConfig clone() {
-        SourceConfig sourceConfig = new SourceConfig();
-        sourceConfig.setIndex(index);
-        sourceConfig.setSource(new ArrayList<>(source));
-        sourceConfig.setQuery(new HashMap<>(query));
-        sourceConfig.setScrollTime(scrollTime);
-        sourceConfig.setScrollSize(scrollSize);
-        sourceConfig.setCatalogTable(catalogTable);
-        return sourceConfig;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index 67226341b5..5de741fc10 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
 
 import lombok.Data;
 
@@ -33,10 +33,10 @@ public class IndexInfo {
 
     public IndexInfo(String index, ReadonlyConfig config) {
         this.index = index;
-        type = config.get(SinkConfig.INDEX_TYPE);
-        if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
-            primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new 
String[0]);
+        type = config.get(ElasticsearchSinkOptions.INDEX_TYPE);
+        if 
(config.getOptional(ElasticsearchSinkOptions.PRIMARY_KEYS).isPresent()) {
+            primaryKeys = 
config.get(ElasticsearchSinkOptions.PRIMARY_KEYS).toArray(new String[0]);
         }
-        keyDelimiter = config.get(SinkConfig.KEY_DELIMITER);
+        keyDelimiter = config.get(ElasticsearchSinkOptions.KEY_DELIMITER);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index b5dea4696b..5841e2ad3d 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -33,7 +33,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.schema.SchemaChangeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
@@ -43,8 +43,8 @@ import java.util.List;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_RETRY_COUNT;
 
 public class ElasticsearchSink
         implements SeaTunnelSink<
@@ -92,8 +92,8 @@ public class ElasticsearchSink
             return Optional.empty();
         }
         Catalog catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
-        SchemaSaveMode schemaSaveMode = 
config.get(SinkConfig.SCHEMA_SAVE_MODE);
-        DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
+        SchemaSaveMode schemaSaveMode = 
config.get(ElasticsearchSinkOptions.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
config.get(ElasticsearchSinkOptions.DATA_SAVE_MODE);
 
         TablePath tablePath = TablePath.of("", 
catalogTable.getTableId().getTableName());
         return Optional.of(
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index b290a63c44..4ba4bdb5a8 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -26,25 +26,25 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX_TYPE;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.KEY_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.PRIMARY_KEYS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.INDEX;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.INDEX_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.KEY_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_RETRY_COUNT;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.PRIMARY_KEYS;
 
 @AutoService(Factory.class)
 public class ElasticsearchSinkFactory implements TableSinkFactory {
@@ -56,7 +56,11 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HOSTS, INDEX, SinkConfig.SCHEMA_SAVE_MODE, 
SinkConfig.DATA_SAVE_MODE)
+                .required(
+                        HOSTS,
+                        INDEX,
+                        ElasticsearchSinkOptions.SCHEMA_SAVE_MODE,
+                        ElasticsearchSinkOptions.DATA_SAVE_MODE)
                 .optional(
                         INDEX_TYPE,
                         PRIMARY_KEYS,
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index a22ca17956..133a2c020b 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -39,7 +39,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 
@@ -61,13 +62,13 @@ public class ElasticsearchSource
                 SupportParallelism,
                 SupportColumnProjection {
 
-    private final List<SourceConfig> sourceConfigList;
+    private final List<ElasticsearchConfig> elasticsearchConfigList;
     private final ReadonlyConfig connectionConfig;
 
     public ElasticsearchSource(ReadonlyConfig config) {
         this.connectionConfig = config;
-        boolean multiSource = 
config.getOptional(SourceConfig.INDEX_LIST).isPresent();
-        boolean singleSource = 
config.getOptional(SourceConfig.INDEX).isPresent();
+        boolean multiSource = 
config.getOptional(ElasticsearchSourceOptions.INDEX_LIST).isPresent();
+        boolean singleSource = 
config.getOptional(ElasticsearchSourceOptions.INDEX).isPresent();
         if (multiSource && singleSource) {
             log.warn(
                     "Elasticsearch Source config warn: when both 'index' and 
'index_list' are present in the configuration, only the 'index_list' 
configuration will take effect");
@@ -78,28 +79,29 @@ public class ElasticsearchSource
                     
ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01.getDescription());
         }
         if (multiSource) {
-            this.sourceConfigList = createMultiSource(config);
+            this.elasticsearchConfigList = createMultiSource(config);
         } else {
-            this.sourceConfigList = 
Collections.singletonList(parseOneIndexQueryConfig(config));
+            this.elasticsearchConfigList =
+                    
Collections.singletonList(parseOneIndexQueryConfig(config));
         }
     }
 
-    private List<SourceConfig> createMultiSource(ReadonlyConfig config) {
-        List<Map<String, Object>> configMaps = 
config.get(SourceConfig.INDEX_LIST);
+    private List<ElasticsearchConfig> createMultiSource(ReadonlyConfig config) 
{
+        List<Map<String, Object>> configMaps = 
config.get(ElasticsearchSourceOptions.INDEX_LIST);
         List<ReadonlyConfig> configList =
                 
configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList());
-        List<SourceConfig> sourceConfigList = new 
ArrayList<>(configList.size());
+        List<ElasticsearchConfig> elasticsearchConfigList = new 
ArrayList<>(configList.size());
         for (ReadonlyConfig readonlyConfig : configList) {
-            SourceConfig sourceConfig = 
parseOneIndexQueryConfig(readonlyConfig);
-            sourceConfigList.add(sourceConfig);
+            ElasticsearchConfig elasticsearchConfig = 
parseOneIndexQueryConfig(readonlyConfig);
+            elasticsearchConfigList.add(elasticsearchConfig);
         }
-        return sourceConfigList;
+        return elasticsearchConfigList;
     }
 
-    private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig 
readonlyConfig) {
+    private ElasticsearchConfig parseOneIndexQueryConfig(ReadonlyConfig 
readonlyConfig) {
 
-        Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY);
-        String index = readonlyConfig.get(SourceConfig.INDEX);
+        Map<String, Object> query = 
readonlyConfig.get(ElasticsearchSourceOptions.QUERY);
+        String index = readonlyConfig.get(ElasticsearchSourceOptions.INDEX);
 
         CatalogTable catalogTable;
         List<String> source;
@@ -112,8 +114,8 @@ public class ElasticsearchSource
             catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
             source = 
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
         } else {
-            source = readonlyConfig.get(SourceConfig.SOURCE);
-            arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN);
+            source = readonlyConfig.get(ElasticsearchSourceOptions.SOURCE);
+            arrayColumn = 
readonlyConfig.get(ElasticsearchSourceOptions.ARRAY_COLUMN);
             Map<String, BasicTypeDefine<EsType>> esFieldType = 
getFieldTypeMapping(index, source);
             if (CollectionUtils.isEmpty(source)) {
                 source = new ArrayList<>(esFieldType.keySet());
@@ -154,17 +156,17 @@ public class ElasticsearchSource
                             "");
         }
 
-        String scrollTime = readonlyConfig.get(SourceConfig.SCROLL_TIME);
-        int scrollSize = readonlyConfig.get(SourceConfig.SCROLL_SIZE);
-        SourceConfig sourceConfig = new SourceConfig();
-        sourceConfig.setSource(source);
-        sourceConfig.setCatalogTable(catalogTable);
-        sourceConfig.setQuery(query);
-        sourceConfig.setScrollTime(scrollTime);
-        sourceConfig.setScrollSize(scrollSize);
-        sourceConfig.setIndex(index);
-        sourceConfig.setCatalogTable(catalogTable);
-        return sourceConfig;
+        String scrollTime = 
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_TIME);
+        int scrollSize = 
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_SIZE);
+        ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
+        elasticsearchConfig.setSource(source);
+        elasticsearchConfig.setCatalogTable(catalogTable);
+        elasticsearchConfig.setQuery(query);
+        elasticsearchConfig.setScrollTime(scrollTime);
+        elasticsearchConfig.setScrollSize(scrollSize);
+        elasticsearchConfig.setIndex(index);
+        elasticsearchConfig.setCatalogTable(catalogTable);
+        return elasticsearchConfig;
     }
 
     @Override
@@ -179,8 +181,8 @@ public class ElasticsearchSource
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
-        return sourceConfigList.stream()
-                .map(SourceConfig::getCatalogTable)
+        return elasticsearchConfigList.stream()
+                .map(ElasticsearchConfig::getCatalogTable)
                 .collect(Collectors.toList());
     }
 
@@ -195,7 +197,7 @@ public class ElasticsearchSource
             createEnumerator(
                     SourceSplitEnumerator.Context<ElasticsearchSourceSplit> 
enumeratorContext) {
         return new ElasticsearchSourceSplitEnumerator(
-                enumeratorContext, connectionConfig, sourceConfigList);
+                enumeratorContext, connectionConfig, elasticsearchConfigList);
     }
 
     @Override
@@ -204,7 +206,7 @@ public class ElasticsearchSource
                     SourceSplitEnumerator.Context<ElasticsearchSourceSplit> 
enumeratorContext,
                     ElasticsearchSourceState sourceState) {
         return new ElasticsearchSourceSplitEnumerator(
-                enumeratorContext, sourceState, connectionConfig, 
sourceConfigList);
+                enumeratorContext, sourceState, connectionConfig, 
elasticsearchConfigList);
     }
 
     @VisibleForTesting
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 8f41256e37..5485e13fad 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -29,20 +29,20 @@ import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX_LIST;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.QUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.INDEX;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.INDEX_LIST;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.QUERY;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_TIME;
 
 @AutoService(Factory.class)
 public class ElasticsearchSourceFactory implements TableSourceFactory {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index a58c2c622d..d46dc5a1b4 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord;
@@ -76,7 +76,7 @@ public class ElasticsearchSourceReader
                 SeaTunnelRowType seaTunnelRowType = 
split.getSeaTunnelRowType();
                 SeaTunnelRowDeserializer deserializer =
                         new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
-                SourceConfig sourceIndexInfo = split.getSourceConfig();
+                ElasticsearchConfig sourceIndexInfo = 
split.getElasticsearchConfig();
                 ScrollResult scrollResult =
                         esRestClient.searchByScroll(
                                 sourceIndexInfo.getIndex(),
@@ -103,11 +103,11 @@ public class ElasticsearchSourceReader
 
     private void outputFromScrollResult(
             ScrollResult scrollResult,
-            SourceConfig sourceConfig,
+            ElasticsearchConfig elasticsearchConfig,
             Collector<SeaTunnelRow> output,
             SeaTunnelRowDeserializer deserializer) {
-        List<String> source = sourceConfig.getSource();
-        String tableId = 
sourceConfig.getCatalogTable().getTablePath().toString();
+        List<String> source = elasticsearchConfig.getSource();
+        String tableId = 
elasticsearchConfig.getCatalogTable().getTablePath().toString();
         for (Map<String, Object> doc : scrollResult.getDocs()) {
             SeaTunnelRow seaTunnelRow =
                     deserializer.deserialize(new ElasticsearchRecord(doc, 
source, tableId));
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
index 3c7d25b5b4..8a0aa99740 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
@@ -33,10 +33,10 @@ public class ElasticsearchSourceSplit implements 
SourceSplit {
 
     private String splitId;
 
-    @Getter private SourceConfig sourceConfig;
+    @Getter private ElasticsearchConfig elasticsearchConfig;
 
     public SeaTunnelRowType getSeaTunnelRowType() {
-        return sourceConfig.getCatalogTable().getSeaTunnelRowType();
+        return elasticsearchConfig.getCatalogTable().getSeaTunnelRowType();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index 5e3356ebd6..7452e630d3 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 
@@ -52,22 +52,22 @@ public class ElasticsearchSourceSplitEnumerator
 
     private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;
 
-    private final List<SourceConfig> sourceConfigs;
+    private final List<ElasticsearchConfig> elasticsearchConfigs;
 
     private volatile boolean shouldEnumerate;
 
     public ElasticsearchSourceSplitEnumerator(
             SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
             ReadonlyConfig connConfig,
-            List<SourceConfig> sourceConfigs) {
-        this(context, null, connConfig, sourceConfigs);
+            List<ElasticsearchConfig> elasticsearchConfigs) {
+        this(context, null, connConfig, elasticsearchConfigs);
     }
 
     public ElasticsearchSourceSplitEnumerator(
             SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
             ElasticsearchSourceState sourceState,
             ReadonlyConfig connConfig,
-            List<SourceConfig> sourceConfigs) {
+            List<ElasticsearchConfig> elasticsearchConfigs) {
         this.context = context;
         this.connConfig = connConfig;
         this.pendingSplit = new HashMap<>();
@@ -76,7 +76,7 @@ public class ElasticsearchSourceSplitEnumerator
             this.shouldEnumerate = sourceState.isShouldEnumerate();
             this.pendingSplit.putAll(sourceState.getPendingSplit());
         }
-        this.sourceConfigs = sourceConfigs;
+        this.elasticsearchConfigs = elasticsearchConfigs;
     }
 
     @Override
@@ -139,9 +139,9 @@ public class ElasticsearchSourceSplitEnumerator
 
     private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
         List<ElasticsearchSourceSplit> splits = new ArrayList<>();
-        for (SourceConfig sourceConfig : sourceConfigs) {
+        for (ElasticsearchConfig elasticsearchConfig : elasticsearchConfigs) {
 
-            String index = sourceConfig.getIndex();
+            String index = elasticsearchConfig.getIndex();
             List<IndexDocsCount> indexDocsCounts = 
esRestClient.getIndexDocsCount(index);
             indexDocsCounts =
                     indexDocsCounts.stream()
@@ -149,7 +149,7 @@ public class ElasticsearchSourceSplitEnumerator
                             
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount))
                             .collect(Collectors.toList());
             for (IndexDocsCount indexDocsCount : indexDocsCounts) {
-                SourceConfig cloneCfg = sourceConfig.clone();
+                ElasticsearchConfig cloneCfg = elasticsearchConfig.clone();
                 cloneCfg.setIndex(indexDocsCount.getIndex());
                 splits.add(
                         new ElasticsearchSourceSplit(
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index 5a269e0737..2131bdc942 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
 
@@ -43,8 +43,8 @@ public class ElasticsearchRowSerializerTest {
         String index = "st_index";
         String primaryKey = "id";
         Map<String, Object> confMap = new HashMap<>();
-        confMap.put(SinkConfig.INDEX.key(), index);
-        confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
+        confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+        confMap.put(ElasticsearchSinkOptions.PRIMARY_KEYS.key(), 
Arrays.asList(primaryKey));
 
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
@@ -83,7 +83,7 @@ public class ElasticsearchRowSerializerTest {
     public void testSerializeUpsertWithoutKey() {
         String index = "st_index";
         Map<String, Object> confMap = new HashMap<>();
-        confMap.put(SinkConfig.INDEX.key(), index);
+        confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
 
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
@@ -121,8 +121,8 @@ public class ElasticsearchRowSerializerTest {
         String index = "st_index";
         String primaryKey = "id";
         Map<String, Object> confMap = new HashMap<>();
-        confMap.put(SinkConfig.INDEX.key(), index);
-        confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
+        confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+        confMap.put(ElasticsearchSinkOptions.PRIMARY_KEYS.key(), 
Arrays.asList(primaryKey));
 
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
@@ -159,8 +159,8 @@ public class ElasticsearchRowSerializerTest {
         String index = "st_index";
         String primaryKey = "id";
         Map<String, Object> confMap = new HashMap<>();
-        confMap.put(SinkConfig.INDEX.key(), index);
-        confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
+        confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+        confMap.put(ElasticsearchSinkOptions.PRIMARY_KEYS.key(), 
Arrays.asList(primaryKey));
 
         ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =

Reply via email to