This is an automated email from the ASF dual-hosted git repository.
jarvis pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d307ab44f2 [improve] add Elasticsearch options (#8623)
d307ab44f2 is described below
commit d307ab44f2c3e5f0640a1b30fa8642d460f20dbc
Author: fcb-xiaobo <[email protected]>
AuthorDate: Fri Feb 7 22:14:19 2025 +0800
[improve] add Elasticsearch options (#8623)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
.../elasticsearch/client/EsRestClient.java | 20 +++----
...onConfig.java => ElasticsearchBaseOptions.java} | 9 ++-
.../elasticsearch/config/ElasticsearchConfig.java | 53 +++++++++++++++++
...nkConfig.java => ElasticsearchSinkOptions.java} | 9 +--
...Config.java => ElasticsearchSourceOptions.java} | 30 +---------
.../seatunnel/elasticsearch/dto/IndexInfo.java | 10 ++--
.../elasticsearch/sink/ElasticsearchSink.java | 10 ++--
.../sink/ElasticsearchSinkFactory.java | 38 +++++++------
.../elasticsearch/source/ElasticsearchSource.java | 66 +++++++++++-----------
.../source/ElasticsearchSourceFactory.java | 28 ++++-----
.../source/ElasticsearchSourceReader.java | 10 ++--
.../source/ElasticsearchSourceSplit.java | 6 +-
.../source/ElasticsearchSourceSplitEnumerator.java | 18 +++---
.../serialize/ElasticsearchRowSerializerTest.java | 16 +++---
15 files changed, 177 insertions(+), 148 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index a473f0f685..92acb42a5f 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -169,7 +169,6 @@ public class ConnectorOptionCheckTest {
private Set<String> buildWhiteList() {
Set<String> whiteList = new HashSet<>();
- whiteList.add("ElasticsearchSourceOptions");
whiteList.add("JdbcSinkOptions");
whiteList.add("TypesenseSourceOptions");
whiteList.add("RabbitmqSourceOptions");
@@ -202,7 +201,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("SentrySinkOptions");
whiteList.add("EasysearchSinkOptions");
whiteList.add("QdrantSinkOptions");
- whiteList.add("ElasticsearchSinkOptions");
whiteList.add("MilvusSourceOptions");
whiteList.add("RocketMqSinkOptions");
whiteList.add("ClickhouseFileSinkOptions");
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 6b698531c4..ab4c35b1a6 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -26,7 +26,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JsonUtils;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
@@ -92,23 +92,23 @@ public class EsRestClient implements Closeable {
}
public static EsRestClient createInstance(ReadonlyConfig config) {
- List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
- Optional<String> username =
config.getOptional(EsClusterConnectionConfig.USERNAME);
- Optional<String> password =
config.getOptional(EsClusterConnectionConfig.PASSWORD);
+ List<String> hosts = config.get(ElasticsearchBaseOptions.HOSTS);
+ Optional<String> username =
config.getOptional(ElasticsearchBaseOptions.USERNAME);
+ Optional<String> password =
config.getOptional(ElasticsearchBaseOptions.PASSWORD);
Optional<String> keystorePath = Optional.empty();
Optional<String> keystorePassword = Optional.empty();
Optional<String> truststorePath = Optional.empty();
Optional<String> truststorePassword = Optional.empty();
- boolean tlsVerifyCertificate =
config.get(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE);
+ boolean tlsVerifyCertificate =
config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
if (tlsVerifyCertificate) {
- keystorePath =
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PATH);
- keystorePassword =
config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD);
- truststorePath =
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH);
+ keystorePath =
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
+ keystorePassword =
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
+ truststorePath =
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
truststorePassword =
-
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD);
+
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
}
- boolean tlsVerifyHostnames =
config.get(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME);
+ boolean tlsVerifyHostnames =
config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
return createInstance(
hosts,
username,
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
similarity index 87%
rename from
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
rename to
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
index c544807fbf..c5e688443e 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchBaseOptions.java
@@ -20,9 +20,10 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.io.Serializable;
import java.util.List;
-public class EsClusterConnectionConfig {
+public class ElasticsearchBaseOptions implements Serializable {
public static final Option<List<String>> HOSTS =
Options.key("hosts")
@@ -37,6 +38,12 @@ public class EsClusterConnectionConfig {
.noDefaultValue()
.withDescription("x-pack username");
+ public static final Option<String> INDEX =
+ Options.key("index")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Elasticsearch index name.Index support contains
variables of field name,such as seatunnel_${age},and the field must appear at
seatunnel row. If not, we will treat it as a normal index");
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
new file mode 100644
index 0000000000..2ff64ee743
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+public class ElasticsearchConfig implements Serializable {
+
+ private String index;
+ private List<String> source;
+ private Map<String, Object> query;
+ private String scrollTime;
+ private int scrollSize;
+
+ private CatalogTable catalogTable;
+
+ public ElasticsearchConfig clone() {
+ ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
+ elasticsearchConfig.setIndex(index);
+ elasticsearchConfig.setSource(new ArrayList<>(source));
+ elasticsearchConfig.setQuery(new HashMap<>(query));
+ elasticsearchConfig.setScrollTime(scrollTime);
+ elasticsearchConfig.setScrollSize(scrollSize);
+ elasticsearchConfig.setCatalogTable(catalogTable);
+ return elasticsearchConfig;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
similarity index 88%
rename from
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
rename to
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
index fdb0300aab..22ab5e0d51 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java
@@ -29,14 +29,7 @@ import static
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-public class SinkConfig {
-
- public static final Option<String> INDEX =
- Options.key("index")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Elasticsearch index name.Index support contains
variables of field name,such as seatunnel_${age},and the field must appear at
seatunnel row. If not, we will treat it as a normal index");
+public class ElasticsearchSinkOptions extends ElasticsearchBaseOptions {
public static final Option<String> INDEX_TYPE =
Options.key("index_type")
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
similarity index 76%
rename from
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
rename to
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
index ffeb69d67f..a0cee3010e 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
@@ -21,13 +21,10 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
import lombok.Getter;
import lombok.Setter;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,7 +32,7 @@ import java.util.Map;
@Getter
@Setter
-public class SourceConfig implements Serializable {
+public class ElasticsearchSourceOptions extends ElasticsearchBaseOptions {
public static final Option<List<Map<String, Object>>> INDEX_LIST =
Options.key("index_list")
@@ -43,12 +40,6 @@ public class SourceConfig implements Serializable {
.noDefaultValue()
.withDescription("index_list for multiTable sync");
- public static final Option<String> INDEX =
- Options.key("index")
- .stringType()
- .noDefaultValue()
- .withDescription("Elasticsearch index name, support *
fuzzy matching");
-
public static final Option<List<String>> SOURCE =
Options.key("source")
.listType()
@@ -84,23 +75,4 @@ public class SourceConfig implements Serializable {
Collections.singletonMap("match_all", new
HashMap<String, String>()))
.withDescription(
"Elasticsearch query language. You can control the
range of data read");
-
- private String index;
- private List<String> source;
- private Map<String, Object> query;
- private String scrollTime;
- private int scrollSize;
-
- private CatalogTable catalogTable;
-
- public SourceConfig clone() {
- SourceConfig sourceConfig = new SourceConfig();
- sourceConfig.setIndex(index);
- sourceConfig.setSource(new ArrayList<>(source));
- sourceConfig.setQuery(new HashMap<>(query));
- sourceConfig.setScrollTime(scrollTime);
- sourceConfig.setScrollSize(scrollSize);
- sourceConfig.setCatalogTable(catalogTable);
- return sourceConfig;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index 67226341b5..5de741fc10 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
import lombok.Data;
@@ -33,10 +33,10 @@ public class IndexInfo {
public IndexInfo(String index, ReadonlyConfig config) {
this.index = index;
- type = config.get(SinkConfig.INDEX_TYPE);
- if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
- primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new
String[0]);
+ type = config.get(ElasticsearchSinkOptions.INDEX_TYPE);
+ if
(config.getOptional(ElasticsearchSinkOptions.PRIMARY_KEYS).isPresent()) {
+ primaryKeys =
config.get(ElasticsearchSinkOptions.PRIMARY_KEYS).toArray(new String[0]);
}
- keyDelimiter = config.get(SinkConfig.KEY_DELIMITER);
+ keyDelimiter = config.get(ElasticsearchSinkOptions.KEY_DELIMITER);
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index b5dea4696b..5841e2ad3d 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -33,7 +33,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
@@ -43,8 +43,8 @@ import java.util.List;
import java.util.Optional;
import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_RETRY_COUNT;
public class ElasticsearchSink
implements SeaTunnelSink<
@@ -92,8 +92,8 @@ public class ElasticsearchSink
return Optional.empty();
}
Catalog catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
- SchemaSaveMode schemaSaveMode =
config.get(SinkConfig.SCHEMA_SAVE_MODE);
- DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
+ SchemaSaveMode schemaSaveMode =
config.get(ElasticsearchSinkOptions.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
config.get(ElasticsearchSinkOptions.DATA_SAVE_MODE);
TablePath tablePath = TablePath.of("",
catalogTable.getTableId().getTableName());
return Optional.of(
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index b290a63c44..4ba4bdb5a8 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -26,25 +26,25 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX_TYPE;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.KEY_DELIMITER;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.PRIMARY_KEYS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.INDEX;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.INDEX_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.KEY_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_RETRY_COUNT;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.PRIMARY_KEYS;
@AutoService(Factory.class)
public class ElasticsearchSinkFactory implements TableSinkFactory {
@@ -56,7 +56,11 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HOSTS, INDEX, SinkConfig.SCHEMA_SAVE_MODE,
SinkConfig.DATA_SAVE_MODE)
+ .required(
+ HOSTS,
+ INDEX,
+ ElasticsearchSinkOptions.SCHEMA_SAVE_MODE,
+ ElasticsearchSinkOptions.DATA_SAVE_MODE)
.optional(
INDEX_TYPE,
PRIMARY_KEYS,
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index a22ca17956..133a2c020b 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -39,7 +39,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchTypeConverter;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -61,13 +62,13 @@ public class ElasticsearchSource
SupportParallelism,
SupportColumnProjection {
- private final List<SourceConfig> sourceConfigList;
+ private final List<ElasticsearchConfig> elasticsearchConfigList;
private final ReadonlyConfig connectionConfig;
public ElasticsearchSource(ReadonlyConfig config) {
this.connectionConfig = config;
- boolean multiSource =
config.getOptional(SourceConfig.INDEX_LIST).isPresent();
- boolean singleSource =
config.getOptional(SourceConfig.INDEX).isPresent();
+ boolean multiSource =
config.getOptional(ElasticsearchSourceOptions.INDEX_LIST).isPresent();
+ boolean singleSource =
config.getOptional(ElasticsearchSourceOptions.INDEX).isPresent();
if (multiSource && singleSource) {
log.warn(
"Elasticsearch Source config warn: when both 'index' and
'index_list' are present in the configuration, only the 'index_list'
configuration will take effect");
@@ -78,28 +79,29 @@ public class ElasticsearchSource
ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01.getDescription());
}
if (multiSource) {
- this.sourceConfigList = createMultiSource(config);
+ this.elasticsearchConfigList = createMultiSource(config);
} else {
- this.sourceConfigList =
Collections.singletonList(parseOneIndexQueryConfig(config));
+ this.elasticsearchConfigList =
+
Collections.singletonList(parseOneIndexQueryConfig(config));
}
}
- private List<SourceConfig> createMultiSource(ReadonlyConfig config) {
- List<Map<String, Object>> configMaps =
config.get(SourceConfig.INDEX_LIST);
+ private List<ElasticsearchConfig> createMultiSource(ReadonlyConfig config)
{
+ List<Map<String, Object>> configMaps =
config.get(ElasticsearchSourceOptions.INDEX_LIST);
List<ReadonlyConfig> configList =
configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList());
- List<SourceConfig> sourceConfigList = new
ArrayList<>(configList.size());
+ List<ElasticsearchConfig> elasticsearchConfigList = new
ArrayList<>(configList.size());
for (ReadonlyConfig readonlyConfig : configList) {
- SourceConfig sourceConfig =
parseOneIndexQueryConfig(readonlyConfig);
- sourceConfigList.add(sourceConfig);
+ ElasticsearchConfig elasticsearchConfig =
parseOneIndexQueryConfig(readonlyConfig);
+ elasticsearchConfigList.add(elasticsearchConfig);
}
- return sourceConfigList;
+ return elasticsearchConfigList;
}
- private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig
readonlyConfig) {
+ private ElasticsearchConfig parseOneIndexQueryConfig(ReadonlyConfig
readonlyConfig) {
- Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY);
- String index = readonlyConfig.get(SourceConfig.INDEX);
+ Map<String, Object> query =
readonlyConfig.get(ElasticsearchSourceOptions.QUERY);
+ String index = readonlyConfig.get(ElasticsearchSourceOptions.INDEX);
CatalogTable catalogTable;
List<String> source;
@@ -112,8 +114,8 @@ public class ElasticsearchSource
catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
source =
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
} else {
- source = readonlyConfig.get(SourceConfig.SOURCE);
- arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN);
+ source = readonlyConfig.get(ElasticsearchSourceOptions.SOURCE);
+ arrayColumn =
readonlyConfig.get(ElasticsearchSourceOptions.ARRAY_COLUMN);
Map<String, BasicTypeDefine<EsType>> esFieldType =
getFieldTypeMapping(index, source);
if (CollectionUtils.isEmpty(source)) {
source = new ArrayList<>(esFieldType.keySet());
@@ -154,17 +156,17 @@ public class ElasticsearchSource
"");
}
- String scrollTime = readonlyConfig.get(SourceConfig.SCROLL_TIME);
- int scrollSize = readonlyConfig.get(SourceConfig.SCROLL_SIZE);
- SourceConfig sourceConfig = new SourceConfig();
- sourceConfig.setSource(source);
- sourceConfig.setCatalogTable(catalogTable);
- sourceConfig.setQuery(query);
- sourceConfig.setScrollTime(scrollTime);
- sourceConfig.setScrollSize(scrollSize);
- sourceConfig.setIndex(index);
- sourceConfig.setCatalogTable(catalogTable);
- return sourceConfig;
+ String scrollTime =
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_TIME);
+ int scrollSize =
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_SIZE);
+ ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
+ elasticsearchConfig.setSource(source);
+ elasticsearchConfig.setCatalogTable(catalogTable);
+ elasticsearchConfig.setQuery(query);
+ elasticsearchConfig.setScrollTime(scrollTime);
+ elasticsearchConfig.setScrollSize(scrollSize);
+ elasticsearchConfig.setIndex(index);
+ elasticsearchConfig.setCatalogTable(catalogTable);
+ return elasticsearchConfig;
}
@Override
@@ -179,8 +181,8 @@ public class ElasticsearchSource
@Override
public List<CatalogTable> getProducedCatalogTables() {
- return sourceConfigList.stream()
- .map(SourceConfig::getCatalogTable)
+ return elasticsearchConfigList.stream()
+ .map(ElasticsearchConfig::getCatalogTable)
.collect(Collectors.toList());
}
@@ -195,7 +197,7 @@ public class ElasticsearchSource
createEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit>
enumeratorContext) {
return new ElasticsearchSourceSplitEnumerator(
- enumeratorContext, connectionConfig, sourceConfigList);
+ enumeratorContext, connectionConfig, elasticsearchConfigList);
}
@Override
@@ -204,7 +206,7 @@ public class ElasticsearchSource
SourceSplitEnumerator.Context<ElasticsearchSourceSplit>
enumeratorContext,
ElasticsearchSourceState sourceState) {
return new ElasticsearchSourceSplitEnumerator(
- enumeratorContext, sourceState, connectionConfig,
sourceConfigList);
+ enumeratorContext, sourceState, connectionConfig,
elasticsearchConfigList);
}
@VisibleForTesting
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 8f41256e37..5485e13fad 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -29,20 +29,20 @@ import com.google.auto.service.AutoService;
import java.io.Serializable;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX_LIST;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.QUERY;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.INDEX;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.INDEX_LIST;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.QUERY;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_TIME;
@AutoService(Factory.class)
public class ElasticsearchSourceFactory implements TableSourceFactory {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index a58c2c622d..d46dc5a1b4 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord;
@@ -76,7 +76,7 @@ public class ElasticsearchSourceReader
SeaTunnelRowType seaTunnelRowType =
split.getSeaTunnelRowType();
SeaTunnelRowDeserializer deserializer =
new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
- SourceConfig sourceIndexInfo = split.getSourceConfig();
+ ElasticsearchConfig sourceIndexInfo =
split.getElasticsearchConfig();
ScrollResult scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
@@ -103,11 +103,11 @@ public class ElasticsearchSourceReader
private void outputFromScrollResult(
ScrollResult scrollResult,
- SourceConfig sourceConfig,
+ ElasticsearchConfig elasticsearchConfig,
Collector<SeaTunnelRow> output,
SeaTunnelRowDeserializer deserializer) {
- List<String> source = sourceConfig.getSource();
- String tableId =
sourceConfig.getCatalogTable().getTablePath().toString();
+ List<String> source = elasticsearchConfig.getSource();
+ String tableId =
elasticsearchConfig.getCatalogTable().getTablePath().toString();
for (Map<String, Object> doc : scrollResult.getDocs()) {
SeaTunnelRow seaTunnelRow =
deserializer.deserialize(new ElasticsearchRecord(doc,
source, tableId));
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
index 3c7d25b5b4..8a0aa99740 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplit.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -33,10 +33,10 @@ public class ElasticsearchSourceSplit implements
SourceSplit {
private String splitId;
- @Getter private SourceConfig sourceConfig;
+ @Getter private ElasticsearchConfig elasticsearchConfig;
public SeaTunnelRowType getSeaTunnelRowType() {
- return sourceConfig.getCatalogTable().getSeaTunnelRowType();
+ return elasticsearchConfig.getCatalogTable().getSeaTunnelRowType();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index 5e3356ebd6..7452e630d3 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -52,22 +52,22 @@ public class ElasticsearchSourceSplitEnumerator
private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;
- private final List<SourceConfig> sourceConfigs;
+ private final List<ElasticsearchConfig> elasticsearchConfigs;
private volatile boolean shouldEnumerate;
public ElasticsearchSourceSplitEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
ReadonlyConfig connConfig,
- List<SourceConfig> sourceConfigs) {
- this(context, null, connConfig, sourceConfigs);
+ List<ElasticsearchConfig> elasticsearchConfigs) {
+ this(context, null, connConfig, elasticsearchConfigs);
}
public ElasticsearchSourceSplitEnumerator(
SourceSplitEnumerator.Context<ElasticsearchSourceSplit> context,
ElasticsearchSourceState sourceState,
ReadonlyConfig connConfig,
- List<SourceConfig> sourceConfigs) {
+ List<ElasticsearchConfig> elasticsearchConfigs) {
this.context = context;
this.connConfig = connConfig;
this.pendingSplit = new HashMap<>();
@@ -76,7 +76,7 @@ public class ElasticsearchSourceSplitEnumerator
this.shouldEnumerate = sourceState.isShouldEnumerate();
this.pendingSplit.putAll(sourceState.getPendingSplit());
}
- this.sourceConfigs = sourceConfigs;
+ this.elasticsearchConfigs = elasticsearchConfigs;
}
@Override
@@ -139,9 +139,9 @@ public class ElasticsearchSourceSplitEnumerator
private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
List<ElasticsearchSourceSplit> splits = new ArrayList<>();
- for (SourceConfig sourceConfig : sourceConfigs) {
+ for (ElasticsearchConfig elasticsearchConfig : elasticsearchConfigs) {
- String index = sourceConfig.getIndex();
+ String index = elasticsearchConfig.getIndex();
List<IndexDocsCount> indexDocsCounts =
esRestClient.getIndexDocsCount(index);
indexDocsCounts =
indexDocsCounts.stream()
@@ -149,7 +149,7 @@ public class ElasticsearchSourceSplitEnumerator
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount))
.collect(Collectors.toList());
for (IndexDocsCount indexDocsCount : indexDocsCounts) {
- SourceConfig cloneCfg = sourceConfig.clone();
+ ElasticsearchConfig cloneCfg = elasticsearchConfig.clone();
cloneCfg.setIndex(indexDocsCount.getIndex());
splits.add(
new ElasticsearchSourceSplit(
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
index 5a269e0737..2131bdc942 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
@@ -43,8 +43,8 @@ public class ElasticsearchRowSerializerTest {
String index = "st_index";
String primaryKey = "id";
Map<String, Object> confMap = new HashMap<>();
- confMap.put(SinkConfig.INDEX.key(), index);
- confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
+ confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+ confMap.put(ElasticsearchSinkOptions.PRIMARY_KEYS.key(),
Arrays.asList(primaryKey));
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
@@ -83,7 +83,7 @@ public class ElasticsearchRowSerializerTest {
public void testSerializeUpsertWithoutKey() {
String index = "st_index";
Map<String, Object> confMap = new HashMap<>();
- confMap.put(SinkConfig.INDEX.key(), index);
+ confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
@@ -121,8 +121,8 @@ public class ElasticsearchRowSerializerTest {
String index = "st_index";
String primaryKey = "id";
Map<String, Object> confMap = new HashMap<>();
- confMap.put(SinkConfig.INDEX.key(), index);
- confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
+ confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+ confMap.put(ElasticsearchSinkOptions.PRIMARY_KEYS.key(),
Arrays.asList(primaryKey));
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =
@@ -159,8 +159,8 @@ public class ElasticsearchRowSerializerTest {
String index = "st_index";
String primaryKey = "id";
Map<String, Object> confMap = new HashMap<>();
- confMap.put(SinkConfig.INDEX.key(), index);
- confMap.put(SinkConfig.PRIMARY_KEYS.key(), Arrays.asList(primaryKey));
+ confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
+ confMap.put(ElasticsearchSinkOptions.PRIMARY_KEYS.key(),
Arrays.asList(primaryKey));
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
ElasticsearchClusterInfo clusterInfo =