This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 716a36ac3e [Feature][Connector] add elasticsearch save_mode (#6046)
716a36ac3e is described below
commit 716a36ac3e24c9e783041366d1bccc1dc854ea90
Author: 老王 <[email protected]>
AuthorDate: Fri Jan 5 14:23:32 2024 +0800
[Feature][Connector] add elasticsearch save_mode (#6046)
---
docs/en/connector-v2/sink/Elasticsearch.md | 69 ++++++++++++++++------
.../catalog/ElasticSearchCatalog.java | 25 +++++---
.../catalog/ElasticSearchCatalogFactory.java | 14 +++--
.../seatunnel/elasticsearch/config/SinkConfig.java | 21 +++++++
.../elasticsearch/sink/ElasticsearchSink.java | 45 ++++++++++++--
.../sink/ElasticsearchSinkFactory.java | 3 +-
.../connector/elasticsearch/ElasticsearchIT.java | 37 ++++++++++++
.../elasticsearch_source_and_sink.conf | 2 +
8 files changed, 180 insertions(+), 36 deletions(-)
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md
b/docs/en/connector-v2/sink/Elasticsearch.md
index a08d4ec3b2..af61df2288 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -19,24 +19,26 @@ Engine Supported
## Options
-| name | type | required | default value |
-|-------------------------|---------|----------|---------------|
-| hosts | array | yes | - |
-| index | string | yes | - |
-| index_type | string | no | |
-| primary_keys | list | no | |
-| key_delimiter | string | no | `_` |
-| username | string | no | |
-| password | string | no | |
-| max_retry_count | int | no | 3 |
-| max_batch_size | int | no | 10 |
-| tls_verify_certificate | boolean | no | true |
-| tls_verify_hostnames | boolean | no | true |
-| tls_keystore_path | string | no | - |
-| tls_keystore_password | string | no | - |
-| tls_truststore_path | string | no | - |
-| tls_truststore_password | string | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|-------------------------|---------|----------|------------------------------|
+| hosts | array | yes | - |
+| index | string | yes | - |
+| schema_save_mode | string | yes | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode | string | yes | APPEND_DATA |
+| index_type | string | no | |
+| primary_keys | list | no | |
+| key_delimiter | string | no | `_` |
+| username | string | no | |
+| password | string | no | |
+| max_retry_count | int | no | 3 |
+| max_batch_size | int | no | 10 |
+| tls_verify_certificate | boolean | no | true |
+| tls_verify_hostnames | boolean | no | true |
+| tls_keystore_path | string | no | - |
+| tls_keystore_password | string | no | - |
+| tls_truststore_path | string | no | - |
+| tls_truststore_password | string | no | - |
+| common-options | | no | - |
### hosts [array]
@@ -103,6 +105,22 @@ The key password for the trust store specified
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
+### schema_save_mode
+
+Before the synchronous task is turned on, different treatment schemes are
selected for the existing surface structure of the target side.
+Option introduction:
+RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild
when the table is saved
+CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist,
skipped when the table is saved
+ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not
exist
+
+### data_save_mode
+
+Before the synchronous task is turned on, different processing schemes are
selected for data existing data on the target side.
+Option introduction:
+DROP_DATA: Preserve database structure and delete data
+APPEND_DATA:Preserve database structure, preserve data
+ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported
+
## Examples
Simple
@@ -173,6 +191,21 @@ sink {
}
```
+SAVE_MODE (Add saveMode function)
+
+```hocon
+sink {
+ Elasticsearch {
+ hosts = ["https://localhost:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 1d5ba105f0..547db6c210 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.HashMap;
@@ -52,6 +53,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
*
* <p>In ElasticSearch, we use the index as the database and table.
*/
+@Slf4j
public class ElasticSearchCatalog implements Catalog {
private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticSearchCatalog.class);
@@ -108,11 +110,12 @@ public class ElasticSearchCatalog implements Catalog {
List<IndexDocsCount> indexDocsCount =
esRestClient.getIndexDocsCount(databaseName);
return true;
} catch (Exception e) {
- throw new CatalogException(
+ log.error(
String.format(
"Failed to check if catalog %s database %s exists",
catalogName, databaseName),
e);
+ return false;
}
}
@@ -177,13 +180,6 @@ public class ElasticSearchCatalog implements Catalog {
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
// Create the index
checkNotNull(tablePath, "tablePath cannot be null");
- if (tableExists(tablePath)) {
- if (ignoreIfExists) {
- return;
- } else {
- throw new TableAlreadyExistException(catalogName, tablePath,
null);
- }
- }
esRestClient.createIndex(tablePath.getTableName());
}
@@ -217,6 +213,19 @@ public class ElasticSearchCatalog implements Catalog {
dropTable(tablePath, ignoreIfNotExists);
}
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+ dropTable(tablePath, ignoreIfNotExists);
+ createTable(tablePath, null, ignoreIfNotExists);
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ final List<IndexDocsCount> indexDocsCount =
+ esRestClient.getIndexDocsCount(tablePath.getTableName());
+ return indexDocsCount.get(0).getDocsCount() > 0;
+ }
+
private Map<String, String> buildTableOptions(TablePath tablePath) {
Map<String, String> options = new HashMap<>();
options.put("connector", "elasticsearch");
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
index 55e3460f90..df6624241f 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
@@ -17,28 +17,32 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import com.google.auto.service.AutoService;
+@AutoService(Factory.class)
public class ElasticSearchCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
- // todo:
- return null;
+ Config config = options.toConfig();
+ return new ElasticSearchCatalog(catalogName, "", config);
}
@Override
public String factoryIdentifier() {
- // todo:
return "Elasticsearch";
}
@Override
public OptionRule optionRule() {
- // todo:
- return null;
+ return OptionRule.builder().build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index beccebe3c1..fdb0300aab 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -19,9 +19,16 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import java.util.Arrays;
import java.util.List;
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
public class SinkConfig {
public static final Option<String> INDEX =
@@ -62,4 +69,18 @@ public class SinkConfig {
.intType()
.defaultValue(3)
.withDescription("one bulk request max try count");
+
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription("data_save_mode");
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index f1ab596b24..b4c67417d0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -20,26 +20,40 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
import com.google.auto.service.AutoService;
+import java.util.Optional;
+
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
@AutoService(SeaTunnelSink.class)
public class ElasticsearchSink
implements SeaTunnelSink<
- SeaTunnelRow,
- ElasticsearchSinkState,
- ElasticsearchCommitInfo,
- ElasticsearchAggregatedCommitInfo> {
+ SeaTunnelRow,
+ ElasticsearchSinkState,
+ ElasticsearchCommitInfo,
+ ElasticsearchAggregatedCommitInfo>,
+ SupportSaveMode {
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
@@ -75,4 +89,27 @@ public class ElasticsearchSink
return new ElasticsearchSinkWriter(
context, seaTunnelRowType, pluginConfig, maxBatchSize,
maxRetryCount);
}
+
+ @Override
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ getPluginName());
+ if (catalogFactory == null) {
+ return Optional.empty();
+ }
+ ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
+ Catalog catalog =
+
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(),
readonlyConfig);
+ SchemaSaveMode schemaSaveMode =
readonlyConfig.get(SinkConfig.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
readonlyConfig.get(SinkConfig.DATA_SAVE_MODE);
+
+ TablePath tablePath = TablePath.of("",
readonlyConfig.get(SinkConfig.INDEX));
+ catalog.open();
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 06561b4748..9ecf44d479 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import com.google.auto.service.AutoService;
@@ -49,7 +50,7 @@ public class ElasticsearchSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HOSTS, INDEX)
+ .required(HOSTS, INDEX, SinkConfig.SCHEMA_SAVE_MODE,
SinkConfig.DATA_SAVE_MODE)
.optional(
INDEX_TYPE,
PRIMARY_KEYS,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index f64097ac6e..98778b01c4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -20,8 +20,11 @@ package org.apache.seatunnel.e2e.connector.elasticsearch;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
@@ -49,6 +52,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -255,4 +259,37 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
}
container.close();
}
+
+ @TestTemplate
+ public void testCatalog(TestContainer container2) throws IOException,
InterruptedException {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("username", "elastic");
+ configMap.put("password", "elasticsearch");
+ configMap.put("hosts", Arrays.asList("https://" +
container.getHttpHostAddress()));
+ configMap.put("index", "st_index3");
+ configMap.put("tls_verify_certificate", false);
+ configMap.put("tls_verify_hostname", false);
+ configMap.put("index_type", "st");
+ final ElasticSearchCatalog elasticSearchCatalog =
+ new ElasticSearchCatalog("Elasticsearch", "",
ConfigFactory.parseMap(configMap));
+ elasticSearchCatalog.open();
+ TablePath tablePath = TablePath.of("", "st_index3");
+ // index exists
+ final boolean existsBefore =
elasticSearchCatalog.tableExists(tablePath);
+ Assertions.assertFalse(existsBefore);
+ // create index
+ elasticSearchCatalog.createTable(tablePath, null, false);
+ final boolean existsAfter =
elasticSearchCatalog.tableExists(tablePath);
+ Assertions.assertTrue(existsAfter);
+ // data exists?
+ final boolean existsData =
elasticSearchCatalog.isExistsData(tablePath);
+ Assertions.assertFalse(existsData);
+ // truncate
+ elasticSearchCatalog.truncateTable(tablePath, false);
+ Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath));
+ // drop
+ elasticSearchCatalog.dropTable(tablePath, false);
+ Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath));
+ elasticSearchCatalog.close();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index 32164a182b..c4f0bb91ff 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -69,5 +69,7 @@ sink {
index = "st_index2"
index_type = "st"
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
}
}
\ No newline at end of file