This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 716a36ac3e [Feature][Connector] add elasticsearch save_mode  (#6046)
716a36ac3e is described below

commit 716a36ac3e24c9e783041366d1bccc1dc854ea90
Author: 老王 <[email protected]>
AuthorDate: Fri Jan 5 14:23:32 2024 +0800

    [Feature][Connector] add elasticsearch save_mode  (#6046)
---
 docs/en/connector-v2/sink/Elasticsearch.md         | 69 ++++++++++++++++------
 .../catalog/ElasticSearchCatalog.java              | 25 +++++---
 .../catalog/ElasticSearchCatalogFactory.java       | 14 +++--
 .../seatunnel/elasticsearch/config/SinkConfig.java | 21 +++++++
 .../elasticsearch/sink/ElasticsearchSink.java      | 45 ++++++++++++--
 .../sink/ElasticsearchSinkFactory.java             |  3 +-
 .../connector/elasticsearch/ElasticsearchIT.java   | 37 ++++++++++++
 .../elasticsearch_source_and_sink.conf             |  2 +
 8 files changed, 180 insertions(+), 36 deletions(-)

diff --git a/docs/en/connector-v2/sink/Elasticsearch.md 
b/docs/en/connector-v2/sink/Elasticsearch.md
index a08d4ec3b2..af61df2288 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -19,24 +19,26 @@ Engine Supported
 
 ## Options
 
-|          name           |  type   | required | default value |
-|-------------------------|---------|----------|---------------|
-| hosts                   | array   | yes      | -             |
-| index                   | string  | yes      | -             |
-| index_type              | string  | no       |               |
-| primary_keys            | list    | no       |               |
-| key_delimiter           | string  | no       | `_`           |
-| username                | string  | no       |               |
-| password                | string  | no       |               |
-| max_retry_count         | int     | no       | 3             |
-| max_batch_size          | int     | no       | 10            |
-| tls_verify_certificate  | boolean | no       | true          |
-| tls_verify_hostnames    | boolean | no       | true          |
-| tls_keystore_path       | string  | no       | -             |
-| tls_keystore_password   | string  | no       | -             |
-| tls_truststore_path     | string  | no       | -             |
-| tls_truststore_password | string  | no       | -             |
-| common-options          |         | no       | -             |
+|          name           |  type   | required |        default value         |
+|-------------------------|---------|----------|------------------------------|
+| hosts                   | array   | yes      | -                            |
+| index                   | string  | yes      | -                            |
+| schema_save_mode        | string  | yes      | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode          | string  | yes      | APPEND_DATA                  |
+| index_type              | string  | no       |                              |
+| primary_keys            | list    | no       |                              |
+| key_delimiter           | string  | no       | `_`                          |
+| username                | string  | no       |                              |
+| password                | string  | no       |                              |
+| max_retry_count         | int     | no       | 3                            |
+| max_batch_size          | int     | no       | 10                           |
+| tls_verify_certificate  | boolean | no       | true                         |
+| tls_verify_hostnames    | boolean | no       | true                         |
+| tls_keystore_path       | string  | no       | -                            |
+| tls_keystore_password   | string  | no       | -                            |
+| tls_truststore_path     | string  | no       | -                            |
+| tls_truststore_password | string  | no       | -                            |
+| common-options          |         | no       | -                            |
 
 ### hosts [array]
 
@@ -103,6 +105,22 @@ The key password for the trust store specified
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
 
+### schema_save_mode
+
+Before the synchronous task is turned on, different treatment schemes are 
selected for the existing surface structure of the target side.
+Option introduction:  
+RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild 
when the table is saved  
+CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, 
skipped when the table is saved  
+ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not 
exist
+
+### data_save_mode
+
+Before the synchronous task is turned on, different processing schemes are 
selected for data existing data on the target side.
+Option introduction:  
+DROP_DATA: Preserve database structure and delete data  
+APPEND_DATA:Preserve database structure, preserve data  
+ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported
+
 ## Examples
 
 Simple
@@ -173,6 +191,21 @@ sink {
 }
 ```
 
+SAVE_MODE (Add saveMode function)
+
+```hocon
+sink {
+    Elasticsearch {
+        hosts = ["https://localhost:9200";]
+        username = "elastic"
+        password = "elasticsearch"
+        
+        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+        data_save_mode = "APPEND_DATA"
+    }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 1d5ba105f0..547db6c210 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -52,6 +53,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  *
  * <p>In ElasticSearch, we use the index as the database and table.
  */
+@Slf4j
 public class ElasticSearchCatalog implements Catalog {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticSearchCatalog.class);
@@ -108,11 +110,12 @@ public class ElasticSearchCatalog implements Catalog {
             List<IndexDocsCount> indexDocsCount = 
esRestClient.getIndexDocsCount(databaseName);
             return true;
         } catch (Exception e) {
-            throw new CatalogException(
+            log.error(
                     String.format(
                             "Failed to check if catalog %s database %s exists",
                             catalogName, databaseName),
                     e);
+            return false;
         }
     }
 
@@ -177,13 +180,6 @@ public class ElasticSearchCatalog implements Catalog {
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         // Create the index
         checkNotNull(tablePath, "tablePath cannot be null");
-        if (tableExists(tablePath)) {
-            if (ignoreIfExists) {
-                return;
-            } else {
-                throw new TableAlreadyExistException(catalogName, tablePath, 
null);
-            }
-        }
         esRestClient.createIndex(tablePath.getTableName());
     }
 
@@ -217,6 +213,19 @@ public class ElasticSearchCatalog implements Catalog {
         dropTable(tablePath, ignoreIfNotExists);
     }
 
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+        dropTable(tablePath, ignoreIfNotExists);
+        createTable(tablePath, null, ignoreIfNotExists);
+    }
+
+    @Override
+    public boolean isExistsData(TablePath tablePath) {
+        final List<IndexDocsCount> indexDocsCount =
+                esRestClient.getIndexDocsCount(tablePath.getTableName());
+        return indexDocsCount.get(0).getDocsCount() > 0;
+    }
+
     private Map<String, String> buildTableOptions(TablePath tablePath) {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "elasticsearch");
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
index 55e3460f90..df6624241f 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java
@@ -17,28 +17,32 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+
+import com.google.auto.service.AutoService;
 
+@AutoService(Factory.class)
 public class ElasticSearchCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
-        // todo:
-        return null;
+        Config config = options.toConfig();
+        return new ElasticSearchCatalog(catalogName, "", config);
     }
 
     @Override
     public String factoryIdentifier() {
-        // todo:
         return "Elasticsearch";
     }
 
     @Override
     public OptionRule optionRule() {
-        // todo:
-        return null;
+        return OptionRule.builder().build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index beccebe3c1..fdb0300aab 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -19,9 +19,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
+import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
 public class SinkConfig {
 
     public static final Option<String> INDEX =
@@ -62,4 +69,18 @@ public class SinkConfig {
                     .intType()
                     .defaultValue(3)
                     .withDescription("one bulk request max try count");
+
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription("schema_save_mode");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .singleChoice(
+                            DataSaveMode.class,
+                            Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
+                    .defaultValue(APPEND_DATA)
+                    .withDescription("data_save_mode");
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index f1ab596b24..b4c67417d0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -20,26 +20,40 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
 
 @AutoService(SeaTunnelSink.class)
 public class ElasticsearchSink
         implements SeaTunnelSink<
-                SeaTunnelRow,
-                ElasticsearchSinkState,
-                ElasticsearchCommitInfo,
-                ElasticsearchAggregatedCommitInfo> {
+                        SeaTunnelRow,
+                        ElasticsearchSinkState,
+                        ElasticsearchCommitInfo,
+                        ElasticsearchAggregatedCommitInfo>,
+                SupportSaveMode {
 
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
@@ -75,4 +89,27 @@ public class ElasticsearchSink
         return new ElasticsearchSinkWriter(
                 context, seaTunnelRowType, pluginConfig, maxBatchSize, 
maxRetryCount);
     }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        CatalogFactory catalogFactory =
+                discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        CatalogFactory.class,
+                        getPluginName());
+        if (catalogFactory == null) {
+            return Optional.empty();
+        }
+        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
+        Catalog catalog =
+                
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), 
readonlyConfig);
+        SchemaSaveMode schemaSaveMode = 
readonlyConfig.get(SinkConfig.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
readonlyConfig.get(SinkConfig.DATA_SAVE_MODE);
+
+        TablePath tablePath = TablePath.of("", 
readonlyConfig.get(SinkConfig.INDEX));
+        catalog.open();
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
index 06561b4748..9ecf44d479 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
 
 import com.google.auto.service.AutoService;
 
@@ -49,7 +50,7 @@ public class ElasticsearchSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HOSTS, INDEX)
+                .required(HOSTS, INDEX, SinkConfig.SCHEMA_SAVE_MODE, 
SinkConfig.DATA_SAVE_MODE)
                 .optional(
                         INDEX_TYPE,
                         PRIMARY_KEYS,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index f64097ac6e..98778b01c4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -20,8 +20,11 @@ package org.apache.seatunnel.e2e.connector.elasticsearch;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
@@ -49,6 +52,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -255,4 +259,37 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         }
         container.close();
     }
+
+    @TestTemplate
+    public void testCatalog(TestContainer container2) throws IOException, 
InterruptedException {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("username", "elastic");
+        configMap.put("password", "elasticsearch");
+        configMap.put("hosts", Arrays.asList("https://"; + 
container.getHttpHostAddress()));
+        configMap.put("index", "st_index3");
+        configMap.put("tls_verify_certificate", false);
+        configMap.put("tls_verify_hostname", false);
+        configMap.put("index_type", "st");
+        final ElasticSearchCatalog elasticSearchCatalog =
+                new ElasticSearchCatalog("Elasticsearch", "", 
ConfigFactory.parseMap(configMap));
+        elasticSearchCatalog.open();
+        TablePath tablePath = TablePath.of("", "st_index3");
+        // index exists
+        final boolean existsBefore = 
elasticSearchCatalog.tableExists(tablePath);
+        Assertions.assertFalse(existsBefore);
+        // create index
+        elasticSearchCatalog.createTable(tablePath, null, false);
+        final boolean existsAfter = 
elasticSearchCatalog.tableExists(tablePath);
+        Assertions.assertTrue(existsAfter);
+        // data exists?
+        final boolean existsData = 
elasticSearchCatalog.isExistsData(tablePath);
+        Assertions.assertFalse(existsData);
+        // truncate
+        elasticSearchCatalog.truncateTable(tablePath, false);
+        Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath));
+        // drop
+        elasticSearchCatalog.dropTable(tablePath, false);
+        Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath));
+        elasticSearchCatalog.close();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
index 32164a182b..c4f0bb91ff 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf
@@ -69,5 +69,7 @@ sink {
 
     index = "st_index2"
     index_type = "st"
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
   }
 }
\ No newline at end of file

Reply via email to