This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1fd854de67 [Improve] Implement ElasticSearch connector factory (#6181)
1fd854de67 is described below

commit 1fd854de67f5e392db756c75958c498cfbf92976
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jan 30 15:10:29 2024 +0800

    [Improve] Implement ElasticSearch connector factory (#6181)
---
 .../catalog/ElasticSearchCatalog.java              | 12 ++--
 .../catalog/ElasticSearchCatalogFactory.java       |  5 +-
 .../elasticsearch/client/EsRestClient.java         | 62 ++++--------------
 .../seatunnel/elasticsearch/dto/IndexInfo.java     | 23 ++-----
 .../elasticsearch/sink/ElasticsearchSink.java      | 48 +++++---------
 .../sink/ElasticsearchSinkFactory.java             |  7 ++
 .../sink/ElasticsearchSinkWriter.java              |  9 ++-
 .../elasticsearch/source/ElasticsearchSource.java  | 74 ++++++++++++----------
 .../source/ElasticsearchSourceFactory.java         | 12 ++++
 .../source/ElasticsearchSourceReader.java          | 11 ++--
 .../source/ElasticsearchSourceSplitEnumerator.java | 37 ++++-------
 .../serialize/ElasticsearchRowSerializerTest.java  | 12 ++--
 .../connector/elasticsearch/ElasticsearchIT.java   |  4 +-
 13 files changed, 133 insertions(+), 183 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 547db6c210..363e87cd55 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigUtil;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -60,22 +59,21 @@ public class ElasticSearchCatalog implements Catalog {
 
     private final String catalogName;
     private final String defaultDatabase;
-    private final Config pluginConfig;
+    private final ReadonlyConfig config;
 
     private EsRestClient esRestClient;
 
     // todo: do we need default database?
-    public ElasticSearchCatalog(
-            String catalogName, String defaultDatabase, Config 
elasticSearchConfig) {
+    public ElasticSearchCatalog(String catalogName, String defaultDatabase, 
ReadonlyConfig config) {
         this.catalogName = checkNotNull(catalogName, "catalogName cannot be 
null");
         this.defaultDatabase = defaultDatabase;
-        this.pluginConfig = checkNotNull(elasticSearchConfig, 
"elasticSearchConfig cannot be null");
+        this.config = checkNotNull(config, "elasticSearchConfig cannot be 
null");
     }
 
     @Override
     public void open() throws CatalogException {
         try {
-            esRestClient = EsRestClient.createInstance(pluginConfig);
+            esRestClient = EsRestClient.createInstance(config);
             ElasticsearchClusterInfo elasticsearchClusterInfo = 
esRestClient.getClusterInfo();
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
index df6624241f..76623f84fc 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -32,8 +30,7 @@ public class ElasticSearchCatalogFactory implements 
CatalogFactory {
 
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
-        Config config = options.toConfig();
-        return new ElasticSearchCatalog(catalogName, "", config);
+        return new ElasticSearchCatalog(catalogName, "", options);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index ed56803238..50c47d1334 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -21,8 +21,8 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
@@ -78,60 +78,24 @@ public class EsRestClient {
         this.restClient = restClient;
     }
 
-    public static EsRestClient createInstance(Config pluginConfig) {
-        List<String> hosts = 
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS.key());
-        Optional<String> username = Optional.empty();
-        Optional<String> password = Optional.empty();
-        if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME.key())) {
-            username =
-                    
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key()));
-            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD.key())) {
-                password =
-                        Optional.of(
-                                
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key()));
-            }
-        }
+    public static EsRestClient createInstance(ReadonlyConfig config) {
+        List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
+        Optional<String> username = 
config.getOptional(EsClusterConnectionConfig.USERNAME);
+        Optional<String> password = 
config.getOptional(EsClusterConnectionConfig.PASSWORD);
         Optional<String> keystorePath = Optional.empty();
         Optional<String> keystorePassword = Optional.empty();
         Optional<String> truststorePath = Optional.empty();
         Optional<String> truststorePassword = Optional.empty();
-        boolean tlsVerifyCertificate =
-                
EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.defaultValue();
-        if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key())) {
-            tlsVerifyCertificate =
-                    
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key());
-        }
+        boolean tlsVerifyCertificate = 
config.get(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE);
         if (tlsVerifyCertificate) {
-            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key())) {
-                keystorePath =
-                        Optional.of(
-                                pluginConfig.getString(
-                                        
EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key()));
-            }
-            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key())) {
-                keystorePassword =
-                        Optional.of(
-                                pluginConfig.getString(
-                                        
EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key()));
-            }
-            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key())) {
-                truststorePath =
-                        Optional.of(
-                                pluginConfig.getString(
-                                        
EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key()));
-            }
-            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()))
 {
-                truststorePassword =
-                        Optional.of(
-                                pluginConfig.getString(
-                                        
EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()));
-            }
-        }
-        boolean tlsVerifyHostnames = 
EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.defaultValue();
-        if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key())) {
-            tlsVerifyHostnames =
-                    
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key());
+            keystorePath = 
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PATH);
+            keystorePassword = 
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD);
+            truststorePath = 
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH);
+            truststorePassword =
+                    
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD);
         }
+
+        boolean tlsVerifyHostnames = 
config.get(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME);
         return createInstance(
                 hosts,
                 username,
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index 53bf3e876b..cb10ed58c0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 
 import lombok.Data;
@@ -32,20 +31,12 @@ public class IndexInfo {
     private String[] primaryKeys;
     private String keyDelimiter;
 
-    public IndexInfo(Config pluginConfig) {
-        index = pluginConfig.getString(SinkConfig.INDEX.key());
-        if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE.key())) {
-            type = pluginConfig.getString(SinkConfig.INDEX_TYPE.key());
-        }
-        if (pluginConfig.hasPath(SinkConfig.PRIMARY_KEYS.key())) {
-            primaryKeys =
-                    pluginConfig
-                            .getStringList(SinkConfig.PRIMARY_KEYS.key())
-                            .toArray(new String[0]);
-        }
-        keyDelimiter = SinkConfig.KEY_DELIMITER.defaultValue();
-        if (pluginConfig.hasPath(SinkConfig.KEY_DELIMITER.key())) {
-            keyDelimiter = 
pluginConfig.getString(SinkConfig.KEY_DELIMITER.key());
+    public IndexInfo(ReadonlyConfig config) {
+        index = config.get(SinkConfig.INDEX);
+        type = config.get(SinkConfig.INDEX_TYPE);
+        if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
+            primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new 
String[0]);
         }
+        keyDelimiter = config.get(SinkConfig.KEY_DELIMITER);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index b4c67417d0..79862879f9 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -17,9 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
@@ -29,10 +26,10 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
@@ -55,39 +52,30 @@ public class ElasticsearchSink
                         ElasticsearchAggregatedCommitInfo>,
                 SupportSaveMode {
 
-    private Config pluginConfig;
-    private SeaTunnelRowType seaTunnelRowType;
+    private ReadonlyConfig config;
+    private CatalogTable catalogTable;
 
-    private int maxBatchSize = MAX_BATCH_SIZE.defaultValue();
+    private final int maxBatchSize;
 
-    private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();
+    private final int maxRetryCount;
 
-    @Override
-    public String getPluginName() {
-        return "Elasticsearch";
+    public ElasticsearchSink(ReadonlyConfig config, CatalogTable catalogTable) 
{
+        this.config = config;
+        this.catalogTable = catalogTable;
+        maxBatchSize = config.get(MAX_BATCH_SIZE);
+        maxRetryCount = config.get(MAX_RETRY_COUNT);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        if (pluginConfig.hasPath(MAX_BATCH_SIZE.key())) {
-            maxBatchSize = pluginConfig.getInt(MAX_BATCH_SIZE.key());
-        }
-        if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
-            maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
-        }
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return "Elasticsearch";
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, 
ElasticsearchSinkState> createWriter(
             SinkWriter.Context context) {
         return new ElasticsearchSinkWriter(
-                context, seaTunnelRowType, pluginConfig, maxBatchSize, 
maxRetryCount);
+                context, catalogTable.getSeaTunnelRowType(), config, 
maxBatchSize, maxRetryCount);
     }
 
     @Override
@@ -100,13 +88,11 @@ public class ElasticsearchSink
         if (catalogFactory == null) {
             return Optional.empty();
         }
-        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
-        Catalog catalog =
-                
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), 
readonlyConfig);
-        SchemaSaveMode schemaSaveMode = 
readonlyConfig.get(SinkConfig.SCHEMA_SAVE_MODE);
-        DataSaveMode dataSaveMode = 
readonlyConfig.get(SinkConfig.DATA_SAVE_MODE);
+        Catalog catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+        SchemaSaveMode schemaSaveMode = 
config.get(SinkConfig.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
 
-        TablePath tablePath = TablePath.of("", 
readonlyConfig.get(SinkConfig.INDEX));
+        TablePath tablePath = TablePath.of("", config.get(SinkConfig.INDEX));
         catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 9ecf44d479..97548e3fdb 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 
 import com.google.auto.service.AutoService;
@@ -67,4 +69,9 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
                         TLS_TRUST_STORE_PASSWORD)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new ElasticsearchSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 2c5a5a5980..35ed49d498 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -63,14 +62,14 @@ public class ElasticsearchSinkWriter
     public ElasticsearchSinkWriter(
             SinkWriter.Context context,
             SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig,
+            ReadonlyConfig config,
             int maxBatchSize,
             int maxRetryCount) {
         this.context = context;
         this.maxBatchSize = maxBatchSize;
 
-        IndexInfo indexInfo = new IndexInfo(pluginConfig);
-        esRestClient = EsRestClient.createInstance(pluginConfig);
+        IndexInfo indexInfo = new IndexInfo(config);
+        esRestClient = EsRestClient.createInstance(config);
         this.seaTunnelRowSerializer =
                 new ElasticsearchRowSerializer(
                         esRestClient.getClusterInfo(), indexInfo, 
seaTunnelRowType);
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index c7891745f5..e99e66e420 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -17,96 +17,106 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchDataTypeConvertor;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
 
-import com.google.auto.service.AutoService;
-
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-@AutoService(SeaTunnelSource.class)
 public class ElasticsearchSource
         implements SeaTunnelSource<
                         SeaTunnelRow, ElasticsearchSourceSplit, 
ElasticsearchSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
 
-    private Config pluginConfig;
+    private final ReadonlyConfig config;
 
-    private SeaTunnelRowType rowTypeInfo;
+    private CatalogTable catalogTable;
 
     private List<String> source;
 
-    @Override
-    public String getPluginName() {
-        return "Elasticsearch";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
+    public ElasticsearchSource(ReadonlyConfig config) {
+        this.config = config;
+        if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
             // todo: We need to remove the schema in ES.
-            rowTypeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-            source = Arrays.asList(rowTypeInfo.getFieldNames());
+            catalogTable = CatalogTableUtil.buildWithConfig(config);
+            source = 
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
         } else {
-            source = pluginConfig.getStringList(SourceConfig.SOURCE.key());
-            EsRestClient esRestClient = 
EsRestClient.createInstance(this.pluginConfig);
+            source = config.get(SourceConfig.SOURCE);
+            EsRestClient esRestClient = EsRestClient.createInstance(config);
             Map<String, String> esFieldType =
-                    esRestClient.getFieldTypeMapping(
-                            pluginConfig.getString(SourceConfig.INDEX.key()), 
source);
+                    
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
             esRestClient.close();
-            SeaTunnelDataType[] fieldTypes = new 
SeaTunnelDataType[source.size()];
+            SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType[source.size()];
             ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
                     new ElasticSearchDataTypeConvertor();
             for (int i = 0; i < source.size(); i++) {
                 String esType = esFieldType.get(source.get(i));
-                SeaTunnelDataType seaTunnelDataType =
+                SeaTunnelDataType<?> seaTunnelDataType =
                         
elasticSearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
                 fieldTypes[i] = seaTunnelDataType;
             }
-            rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]), 
fieldTypes);
+            TableSchema.Builder builder = TableSchema.builder();
+            for (int i = 0; i < source.size(); i++) {
+                builder.column(
+                        PhysicalColumn.of(source.get(i), fieldTypes[i], 0, 
true, null, null));
+            }
+            catalogTable =
+                    CatalogTable.of(
+                            TableIdentifier.of(
+                                    "elasticsearch", null, 
config.get(SourceConfig.INDEX)),
+                            builder.build(),
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            "");
         }
     }
 
+    @Override
+    public String getPluginName() {
+        return "Elasticsearch";
+    }
+
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.rowTypeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> createReader(
             SourceReader.Context readerContext) {
-        return new ElasticsearchSourceReader(readerContext, pluginConfig, 
rowTypeInfo);
+        return new ElasticsearchSourceReader(
+                readerContext, config, catalogTable.getSeaTunnelRowType());
     }
 
     @Override
     public SourceSplitEnumerator<ElasticsearchSourceSplit, 
ElasticsearchSourceState>
             createEnumerator(
                     SourceSplitEnumerator.Context<ElasticsearchSourceSplit> 
enumeratorContext) {
-        return new ElasticsearchSourceSplitEnumerator(enumeratorContext, 
pluginConfig, source);
+        return new ElasticsearchSourceSplitEnumerator(enumeratorContext, 
config, source);
     }
 
     @Override
@@ -115,6 +125,6 @@ public class ElasticsearchSource
                     SourceSplitEnumerator.Context<ElasticsearchSourceSplit> 
enumeratorContext,
                     ElasticsearchSourceState sourceState) {
         return new ElasticsearchSourceSplitEnumerator(
-                enumeratorContext, sourceState, pluginConfig, source);
+                enumeratorContext, sourceState, config, source);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 26ebb8049c..6ff08b7d06 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -19,12 +19,17 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
@@ -67,6 +72,13 @@ public class ElasticsearchSourceFactory implements 
TableSourceFactory {
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>) new 
ElasticsearchSource(context.getOptions());
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return ElasticsearchSource.class;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index 5c32688940..7d2398816a 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -45,7 +44,7 @@ public class ElasticsearchSourceReader
 
     SourceReader.Context context;
 
-    private Config pluginConfig;
+    private final ReadonlyConfig config;
 
     private EsRestClient esRestClient;
 
@@ -57,15 +56,15 @@ public class ElasticsearchSourceReader
     private final long pollNextWaitTime = 1000L;
 
     public ElasticsearchSourceReader(
-            SourceReader.Context context, Config pluginConfig, 
SeaTunnelRowType rowTypeInfo) {
+            SourceReader.Context context, ReadonlyConfig config, 
SeaTunnelRowType rowTypeInfo) {
         this.context = context;
-        this.pluginConfig = pluginConfig;
+        this.config = config;
         this.deserializer = new DefaultSeaTunnelRowDeserializer(rowTypeInfo);
     }
 
     @Override
     public void open() {
-        esRestClient = EsRestClient.createInstance(this.pluginConfig);
+        esRestClient = EsRestClient.createInstance(this.config);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index 8c9eedc2e4..107aaac322 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
@@ -44,9 +43,9 @@ import java.util.stream.Collectors;
 public class ElasticsearchSourceSplitEnumerator
         implements SourceSplitEnumerator<ElasticsearchSourceSplit, 
ElasticsearchSourceState> {
 
-    private SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context;
+    private final SourceSplitEnumerator.Context<ElasticsearchSourceSplit> 
context;
 
-    private Config pluginConfig;
+    private final ReadonlyConfig config;
 
     private EsRestClient esRestClient;
 
@@ -54,24 +53,24 @@ public class ElasticsearchSourceSplitEnumerator
 
     private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;
 
-    private List<String> source;
+    private final List<String> source;
 
     private volatile boolean shouldEnumerate;
 
     public ElasticsearchSourceSplitEnumerator(
             SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
-            Config pluginConfig,
+            ReadonlyConfig config,
             List<String> source) {
-        this(context, null, pluginConfig, source);
+        this(context, null, config, source);
     }
 
     public ElasticsearchSourceSplitEnumerator(
             SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
             ElasticsearchSourceState sourceState,
-            Config pluginConfig,
+            ReadonlyConfig config,
             List<String> source) {
         this.context = context;
-        this.pluginConfig = pluginConfig;
+        this.config = config;
         this.pendingSplit = new HashMap<>();
         this.shouldEnumerate = sourceState == null;
         if (sourceState != null) {
@@ -83,7 +82,7 @@ public class ElasticsearchSourceSplitEnumerator
 
     @Override
     public void open() {
-        esRestClient = EsRestClient.createInstance(pluginConfig);
+        esRestClient = EsRestClient.createInstance(config);
     }
 
     @Override
@@ -141,21 +140,11 @@ public class ElasticsearchSourceSplitEnumerator
 
     private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
         List<ElasticsearchSourceSplit> splits = new ArrayList<>();
-        String scrollTime = SourceConfig.SCROLL_TIME.defaultValue();
-        if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME.key())) {
-            scrollTime = 
pluginConfig.getString(SourceConfig.SCROLL_TIME.key());
-        }
-        int scrollSize = SourceConfig.SCROLL_SIZE.defaultValue();
-        if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
-            scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
-        }
-        Map query = SourceConfig.QUERY.defaultValue();
-        if (pluginConfig.hasPath(SourceConfig.QUERY.key())) {
-            query = (Map) pluginConfig.getAnyRef(SourceConfig.QUERY.key());
-        }
-
+        String scrollTime = config.get(SourceConfig.SCROLL_TIME);
+        int scrollSize = config.get(SourceConfig.SCROLL_SIZE);
+        Map query = config.get(SourceConfig.QUERY);
         List<IndexDocsCount> indexDocsCounts =
-                
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
+                esRestClient.getIndexDocsCount(config.get(SourceConfig.INDEX));
         indexDocsCounts =
                 indexDocsCounts.stream()
                         .filter(x -> x.getDocsCount() != null && 
x.getDocsCount() > 0)
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index f888a8575f..6efa5bba4c 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -17,9 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -48,7 +46,7 @@ public class ElasticsearchRowSerializerTest {
         confMap.put(SinkConfig.INDEX.key(), index);
         confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
 
-        Config pluginConf = ConfigFactory.parseMap(confMap);
+        ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
         IndexInfo indexInfo = new IndexInfo(pluginConf);
@@ -87,7 +85,7 @@ public class ElasticsearchRowSerializerTest {
         Map<String, Object> confMap = new HashMap<>();
         confMap.put(SinkConfig.INDEX.key(), index);
 
-        Config pluginConf = ConfigFactory.parseMap(confMap);
+        ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
         IndexInfo indexInfo = new IndexInfo(pluginConf);
@@ -126,7 +124,7 @@ public class ElasticsearchRowSerializerTest {
         confMap.put(SinkConfig.INDEX.key(), index);
         confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
 
-        Config pluginConf = ConfigFactory.parseMap(confMap);
+        ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
         IndexInfo indexInfo = new IndexInfo(pluginConf);
@@ -164,7 +162,7 @@ public class ElasticsearchRowSerializerTest {
         confMap.put(SinkConfig.INDEX.key(), index);
         confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
 
-        Config pluginConf = ConfigFactory.parseMap(confMap);
+        ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
         ElasticsearchClusterInfo clusterInfo =
                 
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
         IndexInfo indexInfo = new IndexInfo(pluginConf);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index dd810c8225..ddd106451f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.e2e.connector.elasticsearch;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
@@ -272,7 +272,7 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         configMap.put("tls_verify_hostname", false);
         configMap.put("index_type", "st");
         final ElasticSearchCatalog elasticSearchCatalog =
-                new ElasticSearchCatalog("Elasticsearch", "", 
ConfigFactory.parseMap(configMap));
+                new ElasticSearchCatalog("Elasticsearch", "", 
ReadonlyConfig.fromMap(configMap));
         elasticSearchCatalog.open();
         TablePath tablePath = TablePath.of("", "st_index3");
         // index exists


Reply via email to