This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3ceb57f279 [improve] EasySearch support 
schema_save_mode/data_save_mode (#9310)
3ceb57f279 is described below

commit 3ceb57f279d834797e8ce407e7932fec4c2eb7a3
Author: CosmosNi <[email protected]>
AuthorDate: Sat May 17 09:41:05 2025 +0800

    [improve] EasySearch support schema_save_mode/data_save_mode (#9310)
---
 docs/en/connector-v2/sink/Easysearch.md            | 40 +++++++++-
 docs/en/connector-v2/sink/Elasticsearch.md         |  2 +-
 docs/zh/connector-v2/sink/Easysearch.md            | 40 +++++++++-
 docs/zh/connector-v2/sink/Elasticsearch.md         |  2 +-
 docs/zh/connector-v2/sink/Jdbc.md                  |  2 +-
 .../easysearch/catalog/EasysearchCatalog.java      | 64 +++++++++++++++-
 .../catalog/EasysearchCatalogFactory.java          | 11 +--
 .../easysearch/client/EasysearchClient.java        | 23 ++++++
 .../easysearch/config/EasysearchSinkOptions.java   | 21 ++++++
 .../seatunnel/easysearch/sink/EasysearchSink.java  | 57 +++++++++++++-
 .../easysearch/sink/EasysearchSinkFactory.java     |  4 +-
 .../e2e/connector/easysearch/EasysearchIT.java     | 86 ++++++++++++++++++++--
 .../easysearch_source_and_sink_with_save_mode.conf | 75 +++++++++++++++++++
 13 files changed, 397 insertions(+), 30 deletions(-)

diff --git a/docs/en/connector-v2/sink/Easysearch.md 
b/docs/en/connector-v2/sink/Easysearch.md
index 2c8c7ce4f4..cb819eee5d 100644
--- a/docs/en/connector-v2/sink/Easysearch.md
+++ b/docs/en/connector-v2/sink/Easysearch.md
@@ -61,6 +61,8 @@ Engine Supported
 | tls_keystore_password   | string  | no       | -             |
 | tls_truststore_path     | string  | no       | -             |
 | tls_truststore_password | string  | no       | -             |
+| schema_save_mode        | enum    | no       | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode          | enum    | no       | APPEND_DATA   |
 | common-options          |         | no       | -             |
 
 ### hosts [array]
@@ -120,6 +122,21 @@ The path to PEM or JKS trust store. This file must be 
readable by the operating
 
 The key password for the trust store specified
 
+### schema_save_mode [enum]
+
+Choose how to handle the target-side schema before starting the 
synchronization task:
+- `RECREATE_SCHEMA`: Creates the table if it doesn't exist, and deletes and 
recreates it if it does.
+- `CREATE_SCHEMA_WHEN_NOT_EXIST`: Creates the table if it doesn't exist, skips 
creation if it does.
+- `ERROR_WHEN_SCHEMA_NOT_EXIST`: Throws an error if the table doesn't exist.
+- `IGNORE`: Ignores schema handling.
+
+### data_save_mode [enum]
+
+Choose how to handle the target-side data before starting the synchronization 
task:
+- `DROP_DATA`: Preserves the database structure and deletes the data.
+- `APPEND_DATA`: Preserves the database structure and the data.
+- `ERROR_WHEN_DATA_EXISTS`: Reports an error when data exists.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details
@@ -144,7 +161,7 @@ sink {
     Easysearch {
         hosts = ["localhost:9200"]
         index = "seatunnel-${age}"
-        
+
         # cdc required options
         primary_keys = ["key1", "key2", ...]
     }
@@ -159,7 +176,7 @@ sink {
         hosts = ["https://localhost:9200";]
         username = "admin"
         password = "admin"
-        
+
         tls_verify_certificate = false
     }
 }
@@ -173,7 +190,7 @@ sink {
         hosts = ["https://localhost:9200";]
         username = "admin"
         password = "admin"
-        
+
         tls_verify_hostname = false
     }
 }
@@ -187,13 +204,28 @@ sink {
         hosts = ["https://localhost:9200";]
         username = "admin"
         password = "admin"
-        
+
         tls_keystore_path = "${your Easysearch home}/config/certs/http.p12"
         tls_keystore_password = "${your password}"
     }
 }
 ```
 
+SAVE_MODE
+
+```hocon
+sink {
+    Easysearch {
+        hosts = ["https://localhost:9200";]
+        username = "admin"
+        password = "admin"
+
+        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+        data_save_mode = "APPEND_DATA"
+    }
+}
+```
+
 ## Changelog
 
 <ChangeLog />
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md 
b/docs/en/connector-v2/sink/Elasticsearch.md
index 3e9fce0e31..362866db67 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -220,7 +220,7 @@ sink {
 }
 ```
 
-SAVE_MODE (Add saveMode function)
+SAVE_MODE
 
 ```hocon
 sink {
diff --git a/docs/zh/connector-v2/sink/Easysearch.md 
b/docs/zh/connector-v2/sink/Easysearch.md
index 72d75fd71c..8628f81623 100644
--- a/docs/zh/connector-v2/sink/Easysearch.md
+++ b/docs/zh/connector-v2/sink/Easysearch.md
@@ -61,6 +61,8 @@ import ChangeLog from '../changelog/connector-easysearch.md';
 | tls_keystore_password   | string  | 否 | -             |
 | tls_truststore_path     | string  | 否 | -             |
 | tls_truststore_password | string  | 否 | -             |
+| schema_save_mode        | enum    | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode          | enum    | 否 | APPEND_DATA   |
 | common-options          |         | 否 | -             |
 
 ### hosts [array]
@@ -120,6 +122,21 @@ PEM或JKS信任存储的路径。运行SeaTunnel的操作系统用户必须能
 
 指定的信任存储的密钥密码
 
+### schema_save_mode [enum]
+
+在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案:
+- `RECREATE_SCHEMA`:当表不存在时会创建,当表已存在时会删除并重建
+- `CREATE_SCHEMA_WHEN_NOT_EXIST`:当表不存在时会创建,当表已存在时则跳过创建
+- `ERROR_WHEN_SCHEMA_NOT_EXIST`:当表不存在时将抛出错误
+- `IGNORE`:忽略对表的处理
+
+### data_save_mode [enum]
+
+在启动同步任务之前,针对目标端已有的数据选择不同的处理方案:
+- `DROP_DATA`:保留数据库结构并删除数据
+- `APPEND_DATA`:保留数据库结构,保留数据
+- `ERROR_WHEN_DATA_EXISTS`:有数据时报错
+
 ### common options
 
 接收器插件常用参数,详见 [Sink Common Options](../sink-common-options.md)
@@ -144,7 +161,7 @@ sink {
     Easysearch {
         hosts = ["localhost:9200"]
         index = "seatunnel-${age}"
-        
+
         # cdc required options
         primary_keys = ["key1", "key2", ...]
     }
@@ -159,7 +176,7 @@ sink {
         hosts = ["https://localhost:9200";]
         username = "admin"
         password = "admin"
-        
+
         tls_verify_certificate = false
     }
 }
@@ -173,7 +190,7 @@ sink {
         hosts = ["https://localhost:9200";]
         username = "admin"
         password = "admin"
-        
+
         tls_verify_hostname = false
     }
 }
@@ -187,13 +204,28 @@ sink {
         hosts = ["https://localhost:9200";]
         username = "admin"
         password = "admin"
-        
+
         tls_keystore_path = "${your Easysearch home}/config/certs/http.p12"
         tls_keystore_password = "${your password}"
     }
 }
 ```
 
+配置表生成策略
+
+```hocon
+sink {
+    Easysearch {
+        hosts = ["https://localhost:9200";]
+        username = "admin"
+        password = "admin"
+
+        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+        data_save_mode = "APPEND_DATA"
+    }
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md 
b/docs/zh/connector-v2/sink/Elasticsearch.md
index 974d3f3a0c..55479940c4 100644
--- a/docs/zh/connector-v2/sink/Elasticsearch.md
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -222,7 +222,7 @@ sink {
 }
 ```
 
-配置表生成策略 (schema_save_mode)
+配置表生成策略
 
 通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
 
diff --git a/docs/zh/connector-v2/sink/Jdbc.md 
b/docs/zh/connector-v2/sink/Jdbc.md
index 834c0579e0..053d48aa55 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -317,7 +317,7 @@ sink {
 }
 ```
 
-配置表生成策略 (schema_save_mode)
+配置表生成策略
 
 通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
 
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
index d321dfd802..18e3fd8dc7 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
@@ -23,7 +23,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigUtil;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -39,6 +41,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocs
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,6 +55,7 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
  *
  * <p>In Easysearch, we use the index as the database and table.
  */
+@Slf4j
 public class EasysearchCatalog implements Catalog {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EasysearchCatalog.class);
@@ -104,14 +109,14 @@ public class EasysearchCatalog implements Catalog {
     public boolean databaseExists(String databaseName) throws CatalogException 
{
         // check if the index exist
         try {
-            List<IndexDocsCount> indexDocsCount = 
ezsClient.getIndexDocsCount(databaseName);
-            return true;
+            return ezsClient.checkIndexExist(databaseName);
         } catch (Exception e) {
-            throw new CatalogException(
+            log.error(
                     String.format(
                             "Failed to check if catalog %s database %s exists",
                             catalogName, databaseName),
                     e);
+            return false;
         }
     }
 
@@ -224,6 +229,59 @@ public class EasysearchCatalog implements Catalog {
         }
     }
 
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+        // Delete and recreate the index
+        try {
+            dropTable(tablePath, ignoreIfNotExists);
+            createTable(tablePath, null, false);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "Failed to truncate table %s in catalog %s",
+                            tablePath.getTableName(), catalogName),
+                    e);
+        }
+    }
+
+    @Override
+    public boolean isExistsData(TablePath tablePath) {
+        try {
+            // First check if the index exists
+            if (!ezsClient.checkIndexExist(tablePath.getTableName())) {
+                return false;
+            }
+
+            // Then check if it has documents
+            final List<IndexDocsCount> indexDocsCount =
+                    ezsClient.getIndexDocsCount(tablePath.getTableName());
+            return !indexDocsCount.isEmpty() && 
indexDocsCount.get(0).getDocsCount() > 0;
+        } catch (Exception e) {
+            // If any error occurs, return false
+            return false;
+        }
+    }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType,
+            TablePath tablePath,
+            java.util.Optional<CatalogTable> catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            return new InfoPreviewResult("create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new InfoPreviewResult("delete index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new InfoPreviewResult("delete and create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new InfoPreviewResult("create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new InfoPreviewResult("delete index " + 
tablePath.getTableName());
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
+
     private Map<String, String> buildTableOptions(TablePath tablePath) {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "easysearch");
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
index 17ccab123d..147078a41f 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
@@ -21,24 +21,25 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
 
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
 public class EasysearchCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
-        // todo:
-        return null;
+        return new EasysearchCatalog(catalogName, "", options);
     }
 
     @Override
     public String factoryIdentifier() {
-        // todo:
         return "Easysearch";
     }
 
     @Override
     public OptionRule optionRule() {
-        // todo:
-        return null;
+        return OptionRule.builder().build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
index 7679b16545..df3153603e 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
@@ -428,6 +428,29 @@ public class EasysearchClient {
         }
     }
 
+    /**
+     * Instead of the getIndexDocsCount method to determine if the index 
exists,
+     *
+     * <p>
+     *
+     * <p>getIndexDocsCount throws an exception if the index does not exist
+     *
+     * <p>
+     *
+     * @param index index
+     * @return true or false
+     */
+    public boolean checkIndexExist(String index) {
+        Request request = new Request("HEAD", "/" + index.toLowerCase());
+        try {
+            Response response = restClient.performRequest(request);
+            int statusCode = response.getStatusLine().getStatusCode();
+            return statusCode == 200;
+        } catch (Exception ex) {
+            return false;
+        }
+    }
+
     public List<String> listIndex() {
         String endpoint = "/_cat/indices?format=json";
         Request request = new Request("GET", endpoint);
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
index 58ace8015d..a7d0535b2f 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
@@ -19,9 +19,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.easysearch.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
+import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
 public class EasysearchSinkOptions extends EasysearchSinkCommonOptions {
 
     public static final Option<List<String>> PRIMARY_KEYS =
@@ -48,4 +55,18 @@ public class EasysearchSinkOptions extends 
EasysearchSinkCommonOptions {
                     .intType()
                     .defaultValue(3)
                     .withDescription("one bulk request max try count");
+
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription("schema_save_mode");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .singleChoice(
+                            DataSaveMode.class,
+                            Arrays.asList(DROP_DATA, APPEND_DATA, 
ERROR_WHEN_DATA_EXISTS))
+                    .defaultValue(APPEND_DATA)
+                    .withDescription("data_save_mode");
 }
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
index 2de0d30f7e..e47a5acbfe 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
@@ -18,22 +18,40 @@
 package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.source.SupportSchemaEvolution;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.schema.SchemaChangeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSinkState;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
 public class EasysearchSink
         implements SeaTunnelSink<
-                SeaTunnelRow,
-                EasysearchSinkState,
-                EasysearchCommitInfo,
-                EasysearchAggregatedCommitInfo> {
+                        SeaTunnelRow,
+                        EasysearchSinkState,
+                        EasysearchCommitInfo,
+                        EasysearchAggregatedCommitInfo>,
+                SupportSchemaEvolution,
+                SupportSaveMode {
 
     private final ReadonlyConfig pluginConfig;
     private final CatalogTable catalogTable;
@@ -58,4 +76,35 @@ public class EasysearchSink
     public Optional<CatalogTable> getWriteCatalogTable() {
         return SeaTunnelSink.super.getWriteCatalogTable();
     }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        CatalogFactory catalogFactory =
+                discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        CatalogFactory.class,
+                        getPluginName());
+
+        Catalog catalog;
+        if (catalogFactory == null) {
+            // If no CatalogFactory is found, use our EasysearchCatalogFactory 
directly
+            catalogFactory = new EasysearchCatalogFactory();
+        }
+
+        catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), pluginConfig);
+        SchemaSaveMode schemaSaveMode = 
pluginConfig.get(EasysearchSinkOptions.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
pluginConfig.get(EasysearchSinkOptions.DATA_SAVE_MODE);
+
+        // Use the index name directly as both database and table name for 
Easysearch
+        String indexName = catalogTable.getTableId().getTableName();
+        TablePath tablePath = TablePath.of(indexName, indexName);
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
+    }
+
+    @Override
+    public List<SchemaChangeType> supports() {
+        return Arrays.asList(SchemaChangeType.ADD_COLUMN);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
index 861b6f41eb..5ab636605e 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
@@ -49,7 +49,9 @@ public class EasysearchSinkFactory implements 
TableSinkFactory {
                         EasysearchSinkOptions.TLS_KEY_STORE_PATH,
                         EasysearchSinkOptions.TLS_KEY_STORE_PASSWORD,
                         EasysearchSinkOptions.TLS_TRUST_STORE_PATH,
-                        EasysearchSinkOptions.TLS_TRUST_STORE_PASSWORD)
+                        EasysearchSinkOptions.TLS_TRUST_STORE_PASSWORD,
+                        EasysearchSinkOptions.SCHEMA_SAVE_MODE,
+                        EasysearchSinkOptions.DATA_SAVE_MODE)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
index e276214ba9..dce6cbfffd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
@@ -31,12 +31,11 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
+import 
org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocsCount;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.ScrollResult;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
@@ -146,10 +145,6 @@ public class EasysearchIT extends TestSuiteBase implements 
TestResource {
         easysearchClient.bulk(requestBody.toString());
     }
 
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.SPARK, EngineType.FLINK},
-            disabledReason = "Test only one engine for first change")
     @TestTemplate
     public void testEasysearch(TestContainer container) throws IOException, 
InterruptedException {
         Container.ExecResult execResult =
@@ -160,6 +155,85 @@ public class EasysearchIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
+    @TestTemplate
+    public void testEasysearchWithSaveMode(TestContainer container)
+            throws IOException, InterruptedException {
+        // Test CREATE_SCHEMA_WHEN_NOT_EXIST mode
+        Container.ExecResult execResult =
+                
container.executeJob("/easysearch/easysearch_source_and_sink_with_save_mode.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // Wait for index refresh
+        Thread.sleep(2000);
+
+        // Verify the index was created with the correct schema
+        String indexName = "st_index_save_mode";
+        try {
+            List<IndexDocsCount> indexDocsCounts = 
easysearchClient.getIndexDocsCount(indexName);
+            Assertions.assertFalse(indexDocsCounts.isEmpty(), "Index should 
exist");
+        } catch (Exception e) {
+            Assertions.fail("Index should exist but got exception: " + 
e.getMessage());
+        }
+
+        // Verify the data was written correctly
+        List<String> sinkData = readSinkDataFromIndex(indexName);
+        // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+        Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
+    }
+
+    private List<String> readSinkDataFromIndex(String indexName) throws 
InterruptedException {
+        // wait for index refresh
+        Thread.sleep(2000);
+        List<String> source =
+                Lists.newArrayList(
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp");
+        HashMap<String, Object> rangeParam = new HashMap<>();
+        rangeParam.put("gte", 10);
+        rangeParam.put("lte", 20);
+        HashMap<String, Object> range = new HashMap<>();
+        range.put("c_int", rangeParam);
+        Map<String, Object> query = new HashMap<>();
+        query.put("range", range);
+        ScrollResult scrollResult =
+                easysearchClient.searchByScroll(indexName, source, query, 
"1m", 1000);
+        scrollResult
+                .getDocs()
+                .forEach(
+                        x -> {
+                            x.remove("_index");
+                            x.remove("_type");
+                            x.remove("_id");
+                            // I don't know if converting the test cases in 
this way complies with
+                            // the CI specification
+                            x.replace(
+                                    "c_timestamp",
+                                    
LocalDateTime.parse(x.get("c_timestamp").toString())
+                                            .toInstant(ZoneOffset.UTC)
+                                            .toEpochMilli());
+                        });
+        List<String> docs =
+                scrollResult.getDocs().stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                        o -> 
Integer.valueOf(o.get("c_int").toString())))
+                        .map(JsonUtils::toJsonString)
+                        .collect(Collectors.toList());
+        return docs;
+    }
+
     @TestTemplate
     @Disabled("Easysearch catalog not yet realized, see 
EasysearchCatalogFactory.class")
     public void testCatalog(TestContainer container) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/resources/easysearch/easysearch_source_and_sink_with_save_mode.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/resources/easysearch/easysearch_source_and_sink_with_save_mode.conf
new file mode 100644
index 0000000000..540f058049
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/resources/easysearch/easysearch_source_and_sink_with_save_mode.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Easysearch {
+    hosts = ["https://e2e_easysearch:9200";]
+    username = "admin"
+    password = "admin"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_index"
+    query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+    schema = {
+      fields {
+        c_map = "map<string, tinyint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+sink {
+  Easysearch {
+    hosts = ["https://e2e_easysearch:9200";]
+    username = "admin"
+    password = "admin"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_index_save_mode"
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}

Reply via email to