This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3ceb57f279 [improve] EasySearch support
schema_save_mode/data_save_mode (#9310)
3ceb57f279 is described below
commit 3ceb57f279d834797e8ce407e7932fec4c2eb7a3
Author: CosmosNi <[email protected]>
AuthorDate: Sat May 17 09:41:05 2025 +0800
[improve] EasySearch support schema_save_mode/data_save_mode (#9310)
---
docs/en/connector-v2/sink/Easysearch.md | 40 +++++++++-
docs/en/connector-v2/sink/Elasticsearch.md | 2 +-
docs/zh/connector-v2/sink/Easysearch.md | 40 +++++++++-
docs/zh/connector-v2/sink/Elasticsearch.md | 2 +-
docs/zh/connector-v2/sink/Jdbc.md | 2 +-
.../easysearch/catalog/EasysearchCatalog.java | 64 +++++++++++++++-
.../catalog/EasysearchCatalogFactory.java | 11 +--
.../easysearch/client/EasysearchClient.java | 23 ++++++
.../easysearch/config/EasysearchSinkOptions.java | 21 ++++++
.../seatunnel/easysearch/sink/EasysearchSink.java | 57 +++++++++++++-
.../easysearch/sink/EasysearchSinkFactory.java | 4 +-
.../e2e/connector/easysearch/EasysearchIT.java | 86 ++++++++++++++++++++--
.../easysearch_source_and_sink_with_save_mode.conf | 75 +++++++++++++++++++
13 files changed, 397 insertions(+), 30 deletions(-)
diff --git a/docs/en/connector-v2/sink/Easysearch.md
b/docs/en/connector-v2/sink/Easysearch.md
index 2c8c7ce4f4..cb819eee5d 100644
--- a/docs/en/connector-v2/sink/Easysearch.md
+++ b/docs/en/connector-v2/sink/Easysearch.md
@@ -61,6 +61,8 @@ Engine Supported
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
| tls_truststore_password | string | no | - |
+| schema_save_mode | enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode | enum | no | APPEND_DATA |
| common-options | | no | - |
### hosts [array]
@@ -120,6 +122,21 @@ The path to PEM or JKS trust store. This file must be
readable by the operating
The key password for the trust store specified
+### schema_save_mode [enum]
+
+Choose how to handle the target-side schema before starting the
synchronization task:
+- `RECREATE_SCHEMA`: Creates the table if it doesn't exist, and deletes and
recreates it if it does.
+- `CREATE_SCHEMA_WHEN_NOT_EXIST`: Creates the table if it doesn't exist, skips
creation if it does.
+- `ERROR_WHEN_SCHEMA_NOT_EXIST`: Throws an error if the table doesn't exist.
+- `IGNORE`: Ignores schema handling.
+
+### data_save_mode [enum]
+
+Choose how to handle the target-side data before starting the synchronization
task:
+- `DROP_DATA`: Preserves the database structure and deletes the data.
+- `APPEND_DATA`: Preserves the database structure and the data.
+- `ERROR_WHEN_DATA_EXISTS`: Reports an error when data exists.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details
@@ -144,7 +161,7 @@ sink {
Easysearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
-
+
# cdc required options
primary_keys = ["key1", "key2", ...]
}
@@ -159,7 +176,7 @@ sink {
hosts = ["https://localhost:9200"]
username = "admin"
password = "admin"
-
+
tls_verify_certificate = false
}
}
@@ -173,7 +190,7 @@ sink {
hosts = ["https://localhost:9200"]
username = "admin"
password = "admin"
-
+
tls_verify_hostname = false
}
}
@@ -187,13 +204,28 @@ sink {
hosts = ["https://localhost:9200"]
username = "admin"
password = "admin"
-
+
tls_keystore_path = "${your Easysearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}
```
+SAVE_MODE
+
+```hocon
+sink {
+ Easysearch {
+ hosts = ["https://localhost:9200"]
+ username = "admin"
+ password = "admin"
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
## Changelog
<ChangeLog />
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md
b/docs/en/connector-v2/sink/Elasticsearch.md
index 3e9fce0e31..362866db67 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -220,7 +220,7 @@ sink {
}
```
-SAVE_MODE (Add saveMode function)
+SAVE_MODE
```hocon
sink {
diff --git a/docs/zh/connector-v2/sink/Easysearch.md
b/docs/zh/connector-v2/sink/Easysearch.md
index 72d75fd71c..8628f81623 100644
--- a/docs/zh/connector-v2/sink/Easysearch.md
+++ b/docs/zh/connector-v2/sink/Easysearch.md
@@ -61,6 +61,8 @@ import ChangeLog from '../changelog/connector-easysearch.md';
| tls_keystore_password | string | 否 | - |
| tls_truststore_path | string | 否 | - |
| tls_truststore_password | string | 否 | - |
+| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
+| data_save_mode | enum | 否 | APPEND_DATA |
| common-options | | 否 | - |
### hosts [array]
@@ -120,6 +122,21 @@ PEM或JKS信任存储的路径。运行SeaTunnel的操作系统用户必须能
指定的信任存储的密钥密码
+### schema_save_mode [enum]
+
+在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案:
+- `RECREATE_SCHEMA`:当表不存在时会创建,当表已存在时会删除并重建
+- `CREATE_SCHEMA_WHEN_NOT_EXIST`:当表不存在时会创建,当表已存在时则跳过创建
+- `ERROR_WHEN_SCHEMA_NOT_EXIST`:当表不存在时将抛出错误
+- `IGNORE`:忽略对表的处理
+
+### data_save_mode [enum]
+
+在启动同步任务之前,针对目标端已有的数据选择不同的处理方案:
+- `DROP_DATA`:保留数据库结构并删除数据
+- `APPEND_DATA`:保留数据库结构,保留数据
+- `ERROR_WHEN_DATA_EXISTS`:有数据时报错
+
### common options
接收器插件常用参数,详见 [Sink Common Options](../sink-common-options.md)
@@ -144,7 +161,7 @@ sink {
Easysearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
-
+
# cdc required options
primary_keys = ["key1", "key2", ...]
}
@@ -159,7 +176,7 @@ sink {
hosts = ["https://localhost:9200"]
username = "admin"
password = "admin"
-
+
tls_verify_certificate = false
}
}
@@ -173,7 +190,7 @@ sink {
hosts = ["https://localhost:9200"]
username = "admin"
password = "admin"
-
+
tls_verify_hostname = false
}
}
@@ -187,13 +204,28 @@ sink {
hosts = ["https://localhost:9200"]
username = "admin"
password = "admin"
-
+
tls_keystore_path = "${your Easysearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}
```
+配置表生成策略
+
+```hocon
+sink {
+ Easysearch {
+ hosts = ["https://localhost:9200"]
+ username = "admin"
+ password = "admin"
+
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}
+```
+
## 变更日志
<ChangeLog />
diff --git a/docs/zh/connector-v2/sink/Elasticsearch.md
b/docs/zh/connector-v2/sink/Elasticsearch.md
index 974d3f3a0c..55479940c4 100644
--- a/docs/zh/connector-v2/sink/Elasticsearch.md
+++ b/docs/zh/connector-v2/sink/Elasticsearch.md
@@ -222,7 +222,7 @@ sink {
}
```
-配置表生成策略 (schema_save_mode)
+配置表生成策略
通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
index 834c0579e0..053d48aa55 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -317,7 +317,7 @@ sink {
}
```
-配置表生成策略 (schema_save_mode)
+配置表生成策略
通过设置 `schema_save_mode` 配置为 `CREATE_SCHEMA_WHEN_NOT_EXIST` 来支持不存在表时创建表
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
index d321dfd802..18e3fd8dc7 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalog.java
@@ -23,7 +23,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigUtil;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -39,6 +41,8 @@ import
org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocs
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -51,6 +55,7 @@ import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
*
* <p>In Easysearch, we use the index as the database and table.
*/
+@Slf4j
public class EasysearchCatalog implements Catalog {
private static final Logger LOGGER =
LoggerFactory.getLogger(EasysearchCatalog.class);
@@ -104,14 +109,14 @@ public class EasysearchCatalog implements Catalog {
public boolean databaseExists(String databaseName) throws CatalogException
{
// check if the index exist
try {
- List<IndexDocsCount> indexDocsCount =
ezsClient.getIndexDocsCount(databaseName);
- return true;
+ return ezsClient.checkIndexExist(databaseName);
} catch (Exception e) {
- throw new CatalogException(
+ log.error(
String.format(
"Failed to check if catalog %s database %s exists",
catalogName, databaseName),
e);
+ return false;
}
}
@@ -224,6 +229,59 @@ public class EasysearchCatalog implements Catalog {
}
}
+ @Override
+ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+ // Delete and recreate the index
+ try {
+ dropTable(tablePath, ignoreIfNotExists);
+ createTable(tablePath, null, false);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to truncate table %s in catalog %s",
+ tablePath.getTableName(), catalogName),
+ e);
+ }
+ }
+
+ @Override
+ public boolean isExistsData(TablePath tablePath) {
+ try {
+ // First check if the index exists
+ if (!ezsClient.checkIndexExist(tablePath.getTableName())) {
+ return false;
+ }
+
+ // Then check if it has documents
+ final List<IndexDocsCount> indexDocsCount =
+ ezsClient.getIndexDocsCount(tablePath.getTableName());
+ return !indexDocsCount.isEmpty() &&
indexDocsCount.get(0).getDocsCount() > 0;
+ } catch (Exception e) {
+ // If any error occurs, return false
+ return false;
+ }
+ }
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType,
+ TablePath tablePath,
+ java.util.Optional<CatalogTable> catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ return new InfoPreviewResult("create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new InfoPreviewResult("delete index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new InfoPreviewResult("delete and create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new InfoPreviewResult("create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new InfoPreviewResult("delete index " +
tablePath.getTableName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
+
private Map<String, String> buildTableOptions(TablePath tablePath) {
Map<String, String> options = new HashMap<>();
options.put("connector", "easysearch");
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
index 17ccab123d..147078a41f 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/catalog/EasysearchCatalogFactory.java
@@ -21,24 +21,25 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
public class EasysearchCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
- // todo:
- return null;
+ return new EasysearchCatalog(catalogName, "", options);
}
@Override
public String factoryIdentifier() {
- // todo:
return "Easysearch";
}
@Override
public OptionRule optionRule() {
- // todo:
- return null;
+ return OptionRule.builder().build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
index 7679b16545..df3153603e 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/client/EasysearchClient.java
@@ -428,6 +428,29 @@ public class EasysearchClient {
}
}
+ /**
+ * Instead of the getIndexDocsCount method to determine if the index
exists,
+ *
+ * <p>
+ *
+ * <p>getIndexDocsCount throws an exception if the index does not exist
+ *
+ * <p>
+ *
+ * @param index index
+ * @return true or false
+ */
+ public boolean checkIndexExist(String index) {
+ Request request = new Request("HEAD", "/" + index.toLowerCase());
+ try {
+ Response response = restClient.performRequest(request);
+ int statusCode = response.getStatusLine().getStatusCode();
+ return statusCode == 200;
+ } catch (Exception ex) {
+ return false;
+ }
+ }
+
public List<String> listIndex() {
String endpoint = "/_cat/indices?format=json";
Request request = new Request("GET", endpoint);
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
index 58ace8015d..a7d0535b2f 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/config/EasysearchSinkOptions.java
@@ -19,9 +19,16 @@ package
org.apache.seatunnel.connectors.seatunnel.easysearch.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import java.util.Arrays;
import java.util.List;
+import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
+import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
+import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
+
public class EasysearchSinkOptions extends EasysearchSinkCommonOptions {
public static final Option<List<String>> PRIMARY_KEYS =
@@ -48,4 +55,18 @@ public class EasysearchSinkOptions extends
EasysearchSinkCommonOptions {
.intType()
.defaultValue(3)
.withDescription("one bulk request max try count");
+
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ Options.key("schema_save_mode")
+ .enumType(SchemaSaveMode.class)
+ .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+ .withDescription("schema_save_mode");
+
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
+ Options.key("data_save_mode")
+ .singleChoice(
+ DataSaveMode.class,
+ Arrays.asList(DROP_DATA, APPEND_DATA,
ERROR_WHEN_DATA_EXISTS))
+ .defaultValue(APPEND_DATA)
+ .withDescription("data_save_mode");
}
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
index 2de0d30f7e..e47a5acbfe 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
@@ -18,22 +18,40 @@
package org.apache.seatunnel.connectors.seatunnel.easysearch.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.source.SupportSchemaEvolution;
+import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.schema.SchemaChangeType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.easysearch.config.EasysearchSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSinkState;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import static
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
public class EasysearchSink
implements SeaTunnelSink<
- SeaTunnelRow,
- EasysearchSinkState,
- EasysearchCommitInfo,
- EasysearchAggregatedCommitInfo> {
+ SeaTunnelRow,
+ EasysearchSinkState,
+ EasysearchCommitInfo,
+ EasysearchAggregatedCommitInfo>,
+ SupportSchemaEvolution,
+ SupportSaveMode {
private final ReadonlyConfig pluginConfig;
private final CatalogTable catalogTable;
@@ -58,4 +76,35 @@ public class EasysearchSink
public Optional<CatalogTable> getWriteCatalogTable() {
return SeaTunnelSink.super.getWriteCatalogTable();
}
+
+ @Override
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ CatalogFactory catalogFactory =
+ discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CatalogFactory.class,
+ getPluginName());
+
+ Catalog catalog;
+ if (catalogFactory == null) {
+ // If no CatalogFactory is found, use our EasysearchCatalogFactory
directly
+ catalogFactory = new EasysearchCatalogFactory();
+ }
+
+ catalog =
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), pluginConfig);
+ SchemaSaveMode schemaSaveMode =
pluginConfig.get(EasysearchSinkOptions.SCHEMA_SAVE_MODE);
+ DataSaveMode dataSaveMode =
pluginConfig.get(EasysearchSinkOptions.DATA_SAVE_MODE);
+
+ // Use the index name directly as both database and table name for
Easysearch
+ String indexName = catalogTable.getTableId().getTableName();
+ TablePath tablePath = TablePath.of(indexName, indexName);
+ return Optional.of(
+ new DefaultSaveModeHandler(
+ schemaSaveMode, dataSaveMode, catalog, tablePath,
null, null));
+ }
+
+ @Override
+ public List<SchemaChangeType> supports() {
+ return Arrays.asList(SchemaChangeType.ADD_COLUMN);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
index 861b6f41eb..5ab636605e 100644
---
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSinkFactory.java
@@ -49,7 +49,9 @@ public class EasysearchSinkFactory implements
TableSinkFactory {
EasysearchSinkOptions.TLS_KEY_STORE_PATH,
EasysearchSinkOptions.TLS_KEY_STORE_PASSWORD,
EasysearchSinkOptions.TLS_TRUST_STORE_PATH,
- EasysearchSinkOptions.TLS_TRUST_STORE_PASSWORD)
+ EasysearchSinkOptions.TLS_TRUST_STORE_PASSWORD,
+ EasysearchSinkOptions.SCHEMA_SAVE_MODE,
+ EasysearchSinkOptions.DATA_SAVE_MODE)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
index e276214ba9..dce6cbfffd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/easysearch/EasysearchIT.java
@@ -31,12 +31,11 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchCatalog;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
+import
org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.IndexDocsCount;
import
org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
@@ -146,10 +145,6 @@ public class EasysearchIT extends TestSuiteBase implements
TestResource {
easysearchClient.bulk(requestBody.toString());
}
- @DisabledOnContainer(
- value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Test only one engine for first change")
@TestTemplate
public void testEasysearch(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
@@ -160,6 +155,85 @@ public class EasysearchIT extends TestSuiteBase implements
TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
+ @TestTemplate
+ public void testEasysearchWithSaveMode(TestContainer container)
+ throws IOException, InterruptedException {
+ // Test CREATE_SCHEMA_WHEN_NOT_EXIST mode
+ Container.ExecResult execResult =
+
container.executeJob("/easysearch/easysearch_source_and_sink_with_save_mode.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ // Wait for index refresh
+ Thread.sleep(2000);
+
+ // Verify the index was created with the correct schema
+ String indexName = "st_index_save_mode";
+ try {
+ List<IndexDocsCount> indexDocsCounts =
easysearchClient.getIndexDocsCount(indexName);
+ Assertions.assertFalse(indexDocsCounts.isEmpty(), "Index should
exist");
+ } catch (Exception e) {
+ Assertions.fail("Index should exist but got exception: " +
e.getMessage());
+ }
+
+ // Verify the data was written correctly
+ List<String> sinkData = readSinkDataFromIndex(indexName);
+ // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+ Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
+ }
+
+ private List<String> readSinkDataFromIndex(String indexName) throws
InterruptedException {
+ // wait for index refresh
+ Thread.sleep(2000);
+ List<String> source =
+ Lists.newArrayList(
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp");
+ HashMap<String, Object> rangeParam = new HashMap<>();
+ rangeParam.put("gte", 10);
+ rangeParam.put("lte", 20);
+ HashMap<String, Object> range = new HashMap<>();
+ range.put("c_int", rangeParam);
+ Map<String, Object> query = new HashMap<>();
+ query.put("range", range);
+ ScrollResult scrollResult =
+ easysearchClient.searchByScroll(indexName, source, query,
"1m", 1000);
+ scrollResult
+ .getDocs()
+ .forEach(
+ x -> {
+ x.remove("_index");
+ x.remove("_type");
+ x.remove("_id");
+ // I don't know if converting the test cases in
this way complies with
+ // the CI specification
+ x.replace(
+ "c_timestamp",
+
LocalDateTime.parse(x.get("c_timestamp").toString())
+ .toInstant(ZoneOffset.UTC)
+ .toEpochMilli());
+ });
+ List<String> docs =
+ scrollResult.getDocs().stream()
+ .sorted(
+ Comparator.comparingInt(
+ o ->
Integer.valueOf(o.get("c_int").toString())))
+ .map(JsonUtils::toJsonString)
+ .collect(Collectors.toList());
+ return docs;
+ }
+
@TestTemplate
@Disabled("Easysearch catalog not yet realized, see
EasysearchCatalogFactory.class")
public void testCatalog(TestContainer container) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/resources/easysearch/easysearch_source_and_sink_with_save_mode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/resources/easysearch/easysearch_source_and_sink_with_save_mode.conf
new file mode 100644
index 0000000000..540f058049
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-easysearch-e2e/src/test/resources/easysearch/easysearch_source_and_sink_with_save_mode.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Easysearch {
+ hosts = ["https://e2e_easysearch:9200"]
+ username = "admin"
+ password = "admin"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_index"
+ query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+ schema = {
+ fields {
+ c_map = "map<string, tinyint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+sink {
+ Easysearch {
+ hosts = ["https://e2e_easysearch:9200"]
+ username = "admin"
+ password = "admin"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_index_save_mode"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode = "APPEND_DATA"
+ }
+}