This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1fd854de67 [Improve] Implement ElasticSearch connector factory (#6181)
1fd854de67 is described below
commit 1fd854de67f5e392db756c75958c498cfbf92976
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jan 30 15:10:29 2024 +0800
[Improve] Implement ElasticSearch connector factory (#6181)
---
.../catalog/ElasticSearchCatalog.java | 12 ++--
.../catalog/ElasticSearchCatalogFactory.java | 5 +-
.../elasticsearch/client/EsRestClient.java | 62 ++++--------------
.../seatunnel/elasticsearch/dto/IndexInfo.java | 23 ++-----
.../elasticsearch/sink/ElasticsearchSink.java | 48 +++++---------
.../sink/ElasticsearchSinkFactory.java | 7 ++
.../sink/ElasticsearchSinkWriter.java | 9 ++-
.../elasticsearch/source/ElasticsearchSource.java | 74 ++++++++++++----------
.../source/ElasticsearchSourceFactory.java | 12 ++++
.../source/ElasticsearchSourceReader.java | 11 ++--
.../source/ElasticsearchSourceSplitEnumerator.java | 37 ++++-------
.../serialize/ElasticsearchRowSerializerTest.java | 12 ++--
.../connector/elasticsearch/ElasticsearchIT.java | 4 +-
13 files changed, 133 insertions(+), 183 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 547db6c210..363e87cd55 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigUtil;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -60,22 +59,21 @@ public class ElasticSearchCatalog implements Catalog {
private final String catalogName;
private final String defaultDatabase;
- private final Config pluginConfig;
+ private final ReadonlyConfig config;
private EsRestClient esRestClient;
// todo: do we need default database?
- public ElasticSearchCatalog(
- String catalogName, String defaultDatabase, Config
elasticSearchConfig) {
+ public ElasticSearchCatalog(String catalogName, String defaultDatabase,
ReadonlyConfig config) {
this.catalogName = checkNotNull(catalogName, "catalogName cannot be
null");
this.defaultDatabase = defaultDatabase;
- this.pluginConfig = checkNotNull(elasticSearchConfig,
"elasticSearchConfig cannot be null");
+ this.config = checkNotNull(config, "elasticSearchConfig cannot be
null");
}
@Override
public void open() throws CatalogException {
try {
- esRestClient = EsRestClient.createInstance(pluginConfig);
+ esRestClient = EsRestClient.createInstance(config);
ElasticsearchClusterInfo elasticsearchClusterInfo =
esRestClient.getClusterInfo();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
index df6624241f..76623f84fc 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -32,8 +30,7 @@ public class ElasticSearchCatalogFactory implements
CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
- Config config = options.toConfig();
- return new ElasticSearchCatalog(catalogName, "", config);
+ return new ElasticSearchCatalog(catalogName, "", options);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index ed56803238..50c47d1334 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -21,8 +21,8 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
@@ -78,60 +78,24 @@ public class EsRestClient {
this.restClient = restClient;
}
- public static EsRestClient createInstance(Config pluginConfig) {
- List<String> hosts =
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS.key());
- Optional<String> username = Optional.empty();
- Optional<String> password = Optional.empty();
- if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME.key())) {
- username =
-
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key()));
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD.key())) {
- password =
- Optional.of(
-
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key()));
- }
- }
+ public static EsRestClient createInstance(ReadonlyConfig config) {
+ List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
+ Optional<String> username =
config.getOptional(EsClusterConnectionConfig.USERNAME);
+ Optional<String> password =
config.getOptional(EsClusterConnectionConfig.PASSWORD);
Optional<String> keystorePath = Optional.empty();
Optional<String> keystorePassword = Optional.empty();
Optional<String> truststorePath = Optional.empty();
Optional<String> truststorePassword = Optional.empty();
- boolean tlsVerifyCertificate =
-
EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.defaultValue();
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key())) {
- tlsVerifyCertificate =
-
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key());
- }
+ boolean tlsVerifyCertificate =
config.get(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE);
if (tlsVerifyCertificate) {
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key())) {
- keystorePath =
- Optional.of(
- pluginConfig.getString(
-
EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key()));
- }
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key())) {
- keystorePassword =
- Optional.of(
- pluginConfig.getString(
-
EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key()));
- }
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key())) {
- truststorePath =
- Optional.of(
- pluginConfig.getString(
-
EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key()));
- }
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()))
{
- truststorePassword =
- Optional.of(
- pluginConfig.getString(
-
EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()));
- }
- }
- boolean tlsVerifyHostnames =
EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.defaultValue();
- if
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key())) {
- tlsVerifyHostnames =
-
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key());
+ keystorePath =
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PATH);
+ keystorePassword =
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD);
+ truststorePath =
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH);
+ truststorePassword =
+
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD);
}
+
+ boolean tlsVerifyHostnames =
config.get(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME);
return createInstance(
hosts,
username,
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index 53bf3e876b..cb10ed58c0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import lombok.Data;
@@ -32,20 +31,12 @@ public class IndexInfo {
private String[] primaryKeys;
private String keyDelimiter;
- public IndexInfo(Config pluginConfig) {
- index = pluginConfig.getString(SinkConfig.INDEX.key());
- if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE.key())) {
- type = pluginConfig.getString(SinkConfig.INDEX_TYPE.key());
- }
- if (pluginConfig.hasPath(SinkConfig.PRIMARY_KEYS.key())) {
- primaryKeys =
- pluginConfig
- .getStringList(SinkConfig.PRIMARY_KEYS.key())
- .toArray(new String[0]);
- }
- keyDelimiter = SinkConfig.KEY_DELIMITER.defaultValue();
- if (pluginConfig.hasPath(SinkConfig.KEY_DELIMITER.key())) {
- keyDelimiter =
pluginConfig.getString(SinkConfig.KEY_DELIMITER.key());
+ public IndexInfo(ReadonlyConfig config) {
+ index = config.get(SinkConfig.INDEX);
+ type = config.get(SinkConfig.INDEX_TYPE);
+ if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
+ primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new
String[0]);
}
+ keyDelimiter = config.get(SinkConfig.KEY_DELIMITER);
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index b4c67417d0..79862879f9 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -17,9 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
@@ -29,10 +26,10 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
@@ -55,39 +52,30 @@ public class ElasticsearchSink
ElasticsearchAggregatedCommitInfo>,
SupportSaveMode {
- private Config pluginConfig;
- private SeaTunnelRowType seaTunnelRowType;
+ private ReadonlyConfig config;
+ private CatalogTable catalogTable;
- private int maxBatchSize = MAX_BATCH_SIZE.defaultValue();
+ private final int maxBatchSize;
- private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();
+ private final int maxRetryCount;
- @Override
- public String getPluginName() {
- return "Elasticsearch";
+ public ElasticsearchSink(ReadonlyConfig config, CatalogTable catalogTable)
{
+ this.config = config;
+ this.catalogTable = catalogTable;
+ maxBatchSize = config.get(MAX_BATCH_SIZE);
+ maxRetryCount = config.get(MAX_RETRY_COUNT);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- if (pluginConfig.hasPath(MAX_BATCH_SIZE.key())) {
- maxBatchSize = pluginConfig.getInt(MAX_BATCH_SIZE.key());
- }
- if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
- maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
- }
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public String getPluginName() {
+ return "Elasticsearch";
}
@Override
public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState> createWriter(
SinkWriter.Context context) {
return new ElasticsearchSinkWriter(
- context, seaTunnelRowType, pluginConfig, maxBatchSize,
maxRetryCount);
+ context, catalogTable.getSeaTunnelRowType(), config,
maxBatchSize, maxRetryCount);
}
@Override
@@ -100,13 +88,11 @@ public class ElasticsearchSink
if (catalogFactory == null) {
return Optional.empty();
}
- ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
- Catalog catalog =
-
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(),
readonlyConfig);
- SchemaSaveMode schemaSaveMode =
readonlyConfig.get(SinkConfig.SCHEMA_SAVE_MODE);
- DataSaveMode dataSaveMode =
readonlyConfig.get(SinkConfig.DATA_SAVE_MODE);
+ Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+ SchemaSaveMode schemaSaveMode =
config.get(SinkConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
- TablePath tablePath = TablePath.of("",
readonlyConfig.get(SinkConfig.INDEX));
+ TablePath tablePath = TablePath.of("", config.get(SinkConfig.INDEX));
catalog.open();
return Optional.of(
new DefaultSaveModeHandler(
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 9ecf44d479..97548e3fdb 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import com.google.auto.service.AutoService;
@@ -67,4 +69,9 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
TLS_TRUST_STORE_PASSWORD)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new ElasticsearchSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 2c5a5a5980..35ed49d498 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -63,14 +62,14 @@ public class ElasticsearchSinkWriter
public ElasticsearchSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig,
+ ReadonlyConfig config,
int maxBatchSize,
int maxRetryCount) {
this.context = context;
this.maxBatchSize = maxBatchSize;
- IndexInfo indexInfo = new IndexInfo(pluginConfig);
- esRestClient = EsRestClient.createInstance(pluginConfig);
+ IndexInfo indexInfo = new IndexInfo(config);
+ esRestClient = EsRestClient.createInstance(config);
this.seaTunnelRowSerializer =
new ElasticsearchRowSerializer(
esRestClient.getClusterInfo(), indexInfo,
seaTunnelRowType);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index c7891745f5..e99e66e420 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -17,96 +17,106 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchDataTypeConvertor;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
-import com.google.auto.service.AutoService;
-
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-@AutoService(SeaTunnelSource.class)
public class ElasticsearchSource
implements SeaTunnelSource<
SeaTunnelRow, ElasticsearchSourceSplit,
ElasticsearchSourceState>,
SupportParallelism,
SupportColumnProjection {
- private Config pluginConfig;
+ private final ReadonlyConfig config;
- private SeaTunnelRowType rowTypeInfo;
+ private CatalogTable catalogTable;
private List<String> source;
- @Override
- public String getPluginName() {
- return "Elasticsearch";
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
+ public ElasticsearchSource(ReadonlyConfig config) {
+ this.config = config;
+ if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
// todo: We need to remove the schema in ES.
- rowTypeInfo =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- source = Arrays.asList(rowTypeInfo.getFieldNames());
+ catalogTable = CatalogTableUtil.buildWithConfig(config);
+ source =
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
} else {
- source = pluginConfig.getStringList(SourceConfig.SOURCE.key());
- EsRestClient esRestClient =
EsRestClient.createInstance(this.pluginConfig);
+ source = config.get(SourceConfig.SOURCE);
+ EsRestClient esRestClient = EsRestClient.createInstance(config);
Map<String, String> esFieldType =
- esRestClient.getFieldTypeMapping(
- pluginConfig.getString(SourceConfig.INDEX.key()),
source);
+
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
esRestClient.close();
- SeaTunnelDataType[] fieldTypes = new
SeaTunnelDataType[source.size()];
+ SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType[source.size()];
ElasticSearchDataTypeConvertor elasticSearchDataTypeConvertor =
new ElasticSearchDataTypeConvertor();
for (int i = 0; i < source.size(); i++) {
String esType = esFieldType.get(source.get(i));
- SeaTunnelDataType seaTunnelDataType =
+ SeaTunnelDataType<?> seaTunnelDataType =
elasticSearchDataTypeConvertor.toSeaTunnelType(source.get(i), esType);
fieldTypes[i] = seaTunnelDataType;
}
- rowTypeInfo = new SeaTunnelRowType(source.toArray(new String[0]),
fieldTypes);
+ TableSchema.Builder builder = TableSchema.builder();
+ for (int i = 0; i < source.size(); i++) {
+ builder.column(
+ PhysicalColumn.of(source.get(i), fieldTypes[i], 0,
true, null, null));
+ }
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(
+ "elasticsearch", null,
config.get(SourceConfig.INDEX)),
+ builder.build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "");
}
}
+ @Override
+ public String getPluginName() {
+ return "Elasticsearch";
+ }
+
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.rowTypeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
public SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> createReader(
SourceReader.Context readerContext) {
- return new ElasticsearchSourceReader(readerContext, pluginConfig,
rowTypeInfo);
+ return new ElasticsearchSourceReader(
+ readerContext, config, catalogTable.getSeaTunnelRowType());
}
@Override
public SourceSplitEnumerator<ElasticsearchSourceSplit,
ElasticsearchSourceState>
createEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit>
enumeratorContext) {
- return new ElasticsearchSourceSplitEnumerator(enumeratorContext,
pluginConfig, source);
+ return new ElasticsearchSourceSplitEnumerator(enumeratorContext,
config, source);
}
@Override
@@ -115,6 +125,6 @@ public class ElasticsearchSource
SourceSplitEnumerator.Context<ElasticsearchSourceSplit>
enumeratorContext,
ElasticsearchSourceState sourceState) {
return new ElasticsearchSourceSplitEnumerator(
- enumeratorContext, sourceState, pluginConfig, source);
+ enumeratorContext, sourceState, config, source);
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 26ebb8049c..6ff08b7d06 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -19,12 +19,17 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
@@ -67,6 +72,13 @@ public class ElasticsearchSourceFactory implements
TableSourceFactory {
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>) new
ElasticsearchSource(context.getOptions());
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return ElasticsearchSource.class;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index 5c32688940..7d2398816a 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -45,7 +44,7 @@ public class ElasticsearchSourceReader
SourceReader.Context context;
- private Config pluginConfig;
+ private final ReadonlyConfig config;
private EsRestClient esRestClient;
@@ -57,15 +56,15 @@ public class ElasticsearchSourceReader
private final long pollNextWaitTime = 1000L;
public ElasticsearchSourceReader(
- SourceReader.Context context, Config pluginConfig,
SeaTunnelRowType rowTypeInfo) {
+ SourceReader.Context context, ReadonlyConfig config,
SeaTunnelRowType rowTypeInfo) {
this.context = context;
- this.pluginConfig = pluginConfig;
+ this.config = config;
this.deserializer = new DefaultSeaTunnelRowDeserializer(rowTypeInfo);
}
@Override
public void open() {
- esRestClient = EsRestClient.createInstance(this.pluginConfig);
+ esRestClient = EsRestClient.createInstance(this.config);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index 8c9eedc2e4..107aaac322 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
@@ -44,9 +43,9 @@ import java.util.stream.Collectors;
public class ElasticsearchSourceSplitEnumerator
implements SourceSplitEnumerator<ElasticsearchSourceSplit,
ElasticsearchSourceState> {
- private SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context;
+ private final SourceSplitEnumerator.Context<ElasticsearchSourceSplit>
context;
- private Config pluginConfig;
+ private final ReadonlyConfig config;
private EsRestClient esRestClient;
@@ -54,24 +53,24 @@ public class ElasticsearchSourceSplitEnumerator
private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;
- private List<String> source;
+ private final List<String> source;
private volatile boolean shouldEnumerate;
public ElasticsearchSourceSplitEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
- Config pluginConfig,
+ ReadonlyConfig config,
List<String> source) {
- this(context, null, pluginConfig, source);
+ this(context, null, config, source);
}
public ElasticsearchSourceSplitEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
ElasticsearchSourceState sourceState,
- Config pluginConfig,
+ ReadonlyConfig config,
List<String> source) {
this.context = context;
- this.pluginConfig = pluginConfig;
+ this.config = config;
this.pendingSplit = new HashMap<>();
this.shouldEnumerate = sourceState == null;
if (sourceState != null) {
@@ -83,7 +82,7 @@ public class ElasticsearchSourceSplitEnumerator
@Override
public void open() {
- esRestClient = EsRestClient.createInstance(pluginConfig);
+ esRestClient = EsRestClient.createInstance(config);
}
@Override
@@ -141,21 +140,11 @@ public class ElasticsearchSourceSplitEnumerator
private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
List<ElasticsearchSourceSplit> splits = new ArrayList<>();
- String scrollTime = SourceConfig.SCROLL_TIME.defaultValue();
- if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME.key())) {
- scrollTime =
pluginConfig.getString(SourceConfig.SCROLL_TIME.key());
- }
- int scrollSize = SourceConfig.SCROLL_SIZE.defaultValue();
- if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
- scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
- }
- Map query = SourceConfig.QUERY.defaultValue();
- if (pluginConfig.hasPath(SourceConfig.QUERY.key())) {
- query = (Map) pluginConfig.getAnyRef(SourceConfig.QUERY.key());
- }
-
+ String scrollTime = config.get(SourceConfig.SCROLL_TIME);
+ int scrollSize = config.get(SourceConfig.SCROLL_SIZE);
+ Map query = config.get(SourceConfig.QUERY);
List<IndexDocsCount> indexDocsCounts =
-
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
+ esRestClient.getIndexDocsCount(config.get(SourceConfig.INDEX));
indexDocsCounts =
indexDocsCounts.stream()
.filter(x -> x.getDocsCount() != null &&
x.getDocsCount() > 0)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index f888a8575f..6efa5bba4c 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -17,9 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -48,7 +46,7 @@ public class ElasticsearchRowSerializerTest {
confMap.put(SinkConfig.INDEX.key(), index);
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
- Config pluginConf = ConfigFactory.parseMap(confMap);
+ ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
@@ -87,7 +85,7 @@ public class ElasticsearchRowSerializerTest {
Map<String, Object> confMap = new HashMap<>();
confMap.put(SinkConfig.INDEX.key(), index);
- Config pluginConf = ConfigFactory.parseMap(confMap);
+ ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
@@ -126,7 +124,7 @@ public class ElasticsearchRowSerializerTest {
confMap.put(SinkConfig.INDEX.key(), index);
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
- Config pluginConf = ConfigFactory.parseMap(confMap);
+ ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
@@ -164,7 +162,7 @@ public class ElasticsearchRowSerializerTest {
confMap.put(SinkConfig.INDEX.key(), index);
confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
- Config pluginConf = ConfigFactory.parseMap(confMap);
+ ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
IndexInfo indexInfo = new IndexInfo(pluginConf);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index dd810c8225..ddd106451f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.e2e.connector.elasticsearch;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
@@ -272,7 +272,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
configMap.put("tls_verify_hostname", false);
configMap.put("index_type", "st");
final ElasticSearchCatalog elasticSearchCatalog =
- new ElasticSearchCatalog("Elasticsearch", "",
ConfigFactory.parseMap(configMap));
+ new ElasticSearchCatalog("Elasticsearch", "",
ReadonlyConfig.fromMap(configMap));
elasticSearchCatalog.open();
TablePath tablePath = TablePath.of("", "st_index3");
// index exists