This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 948d588d06 [Feature][connector-elasticsearch] elasticsearch source
support PIT (#9150)
948d588d06 is described below
commit 948d588d06c10d8def5d740ff164cadb380aee66
Author: CosmosNi <[email protected]>
AuthorDate: Wed Apr 16 09:32:55 2025 +0800
[Feature][connector-elasticsearch] elasticsearch source support PIT (#9150)
---
docs/en/connector-v2/source/Elasticsearch.md | 51 +++++-
docs/zh/connector-v2/source/Elasticsearch.md | 51 +++++-
.../elasticsearch/client/EsRestClient.java | 204 +++++++++++++++++++++
.../elasticsearch/config/ElasticsearchConfig.java | 12 ++
.../config/ElasticsearchSourceOptions.java | 24 ++-
...{SearchTypeEnum.java => SearchApiTypeEnum.java} | 9 +-
.../elasticsearch/config/SearchTypeEnum.java | 3 +
.../source/PointInTimeResult.java} | 32 +++-
.../exception/ElasticsearchConnectorErrorCode.java | 7 +-
.../elasticsearch/source/ElasticsearchSource.java | 11 ++
.../source/ElasticsearchSourceFactory.java | 8 +
.../source/ElasticsearchSourceReader.java | 124 +++++++++++--
.../connector/elasticsearch/ElasticsearchIT.java | 11 ++
.../elasticsearch_source_with_pit.conf | 82 +++++++++
14 files changed, 595 insertions(+), 34 deletions(-)
diff --git a/docs/en/connector-v2/source/Elasticsearch.md
b/docs/en/connector-v2/source/Elasticsearch.md
index 92299b7b2d..dff046767b 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -30,8 +30,9 @@ support version >= 2.x and <= 8.x.
| index_list | array | no | used to define a multiple
table task |
| source | array | no | -
|
| query | json | no | {"match_all": {}}
|
-| search_type | json | no | Search method,sql or
dsl,default dsl |
-| sql_query | json | no | sql query
|
+| search_type | enum | no | Query type, SQL or DSL,
default DSL |
+| search_api_type | enum | no | Pagination API type, SCROLL
or PIT, default SCROLL |
+| sql_query | json | no | SQL query, required when
search_type is SQL |
| scroll_time | string | no | 1m
|
| scroll_size | int | no | 100
|
| tls_verify_certificate | boolean | no | true
|
@@ -41,6 +42,8 @@ support version >= 2.x and <= 8.x.
| tls_keystore_password | string | no | -
|
| tls_truststore_path | string | no | -
|
| tls_truststore_password | string | no | -
|
+| pit_keep_alive | long | no | 60000 (1 minute)
|
+| pit_batch_size | int | no | 100
|
| common-options | | no | -
|
@@ -113,6 +116,22 @@ The path to PEM or JKS trust store. This file must be
readable by the operating
The key password for the trust store specified
+### search_type
+Query type, available values:
+- DSL: Use Domain Specific Language query (default)
+- SQL: Use SQL query
+
+### search_api_type
+Pagination API type, available values:
+- SCROLL: Use Scroll API for pagination (default)
+- PIT: Use Point in Time (PIT) API for pagination
+
+### pit_keep_alive [long]
+The amount of time (in milliseconds) for which the PIT should be keep alive
+
+### pit_batch_size [long]
+Maximum number of hits to be returned with each PIT search request
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
@@ -177,7 +196,7 @@ source {
c_date2,
c_null
]
-
+
}
]
@@ -214,7 +233,7 @@ source {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
-
+
tls_verify_certificate = false
}
}
@@ -228,7 +247,7 @@ source {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
-
+
tls_verify_hostname = false
}
}
@@ -242,7 +261,7 @@ source {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
-
+
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
@@ -266,6 +285,26 @@ source {
}
```
+Demo7: PIT
+```hocon
+source {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_index"
+ query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+
+ # Use DSL query with PIT API
+ search_type = DSL
+ search_api_type = PIT
+ pit_keep_alive = 60000 # 1 minute in milliseconds
+ pit_batch_size = 100
+```
+
## Changelog
<ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/Elasticsearch.md
b/docs/zh/connector-v2/source/Elasticsearch.md
index 9c9c6dd3cc..ae6ed421bd 100644
--- a/docs/zh/connector-v2/source/Elasticsearch.md
+++ b/docs/zh/connector-v2/source/Elasticsearch.md
@@ -28,8 +28,9 @@ import ChangeLog from
'../changelog/connector-elasticsearch.md';
| index_list | array | no | 用来定义多索引同步任务
|
| source | array | no | -
|
| query | json | no | {"match_all": {}}
|
-| search_type | json | no | 查询方式,sql或者dsl,默认 dsl
|
-| sql_query | json | no | sql 查询语句
|
+| search_type | enum | no | 查询类型,SQL 或 DSL,默认 DSL
|
+| search_api_type | enum | no | 分页 API 类型,SCROLL 或 PIT,默认
SCROLL |
+| sql_query | json | no | SQL 查询语句,当 search_type 为 SQL
时必须 |
| scroll_time | string | no | 1m
|
| scroll_size | int | no | 100
|
| tls_verify_certificate | boolean | no | true
|
@@ -39,6 +40,8 @@ import ChangeLog from
'../changelog/connector-elasticsearch.md';
| tls_keystore_password | string | no | -
|
| tls_truststore_path | string | no | -
|
| tls_truststore_password | string | no | -
|
+| pit_keep_alive | long | no | 60000 (1 minute)
|
+| pit_batch_size | int | no | 100
|
| common-options | | no | -
|
### hosts [array]
@@ -115,6 +118,22 @@ PEM 或 JKS 信任库的路径。该文件必须对运行 SeaTunnel 的操作系
指定信任库的密钥密码。
+### search_type
+查询类型,可选值:
+- DSL: 使用 Domain Specific Language 查询(默认)
+- SQL: 使用 SQL 查询
+
+### search_api_type
+分页 API 类型,可选值:
+- SCROLL: 使用 Scroll API 进行分页(默认)
+- PIT: 使用 Point in Time (PIT) API 进行分页
+
+### pit_keep_alive [long]
+PIT 应保持活动的时间量(以毫秒为单位)
+
+### pit_batch_size [long]
+每次 PIT 搜索请求返回的最大数量
+
### common options
Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md)
@@ -180,7 +199,7 @@ source {
c_date2,
c_null
]
-
+
}
]
@@ -215,7 +234,7 @@ source {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
-
+
tls_verify_certificate = false
}
}
@@ -229,7 +248,7 @@ source {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
-
+
tls_verify_hostname = false
}
}
@@ -243,7 +262,7 @@ source {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
-
+
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
@@ -267,6 +286,26 @@ source {
}
```
+Demo7: PIT方式滚动查询
+```hocon
+source {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_index"
+ query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+
+ # 使用 DSL 查询和 PIT API
+ search_type = DSL
+ search_api_type = PIT
+ pit_keep_alive = 60000 # 1 minute in milliseconds
+ pit_batch_size = 100
+```
+
## 变更日志
<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index b2be5dac29..69c34faef6 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.Elasticsea
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointInTimeResult;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -61,6 +62,7 @@ import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -876,4 +878,206 @@ public class EsRestClient implements Closeable {
ex);
}
}
+
+ /**
+ * Creates a Point-in-Time (PIT) for the specified index.
+ *
+ * @param index The index to create a PIT for
+ * @param keepAlive The time to keep the PIT alive (in milliseconds)
+ * @return The PIT ID
+ */
+ public String createPointInTime(String index, long keepAlive) {
+ String endpoint = String.format("/%s/_pit?keep_alive=%dms",
index.toLowerCase(), keepAlive);
+ Request request = new Request("POST", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED,
+ "POST " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode jsonNode = JsonUtils.parseObject(entity);
+ return jsonNode.get("id").asText();
+ } else {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED,
+ String.format(
+ "POST %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED, ex);
+ }
+ }
+
+ /**
+ * Deletes a Point-in-Time (PIT).
+ *
+ * @param pitId The PIT ID to delete
+ * @return True if the PIT was successfully deleted
+ */
+ public boolean deletePointInTime(String pitId) {
+ String endpoint = "/_pit";
+ Request request = new Request("DELETE", endpoint);
+ Map<String, String> requestBody = new HashMap<>();
+ requestBody.put("id", pitId);
+ request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED,
+ "DELETE " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode jsonNode = JsonUtils.parseObject(entity);
+ return jsonNode.get("succeeded").asBoolean();
+ } else {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED,
+ String.format(
+ "DELETE %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED, ex);
+ }
+ }
+
+ /**
+ * Searches using a Point-in-Time (PIT).
+ *
+ * @param pitId The PIT ID to use
+ * @param source The fields to include in the response
+ * @param query The query to execute
+ * @param batchSize The number of documents to return
+ * @param searchAfter The sort values to search after (for pagination)
+ * @param keepAlive The time to keep the PIT alive (in milliseconds)
+ * @return The search results
+ */
+ public PointInTimeResult searchWithPointInTime(
+ String pitId,
+ List<String> source,
+ Map<String, Object> query,
+ int batchSize,
+ Object[] searchAfter,
+ long keepAlive) {
+
+ Map<String, Object> requestBody = new HashMap<>();
+ requestBody.put("size", batchSize);
+ requestBody.put("query", query);
+ requestBody.put("_source", source);
+
+ // Add PIT information
+ Map<String, Object> pit = new HashMap<>();
+ pit.put("id", pitId);
+ pit.put("keep_alive", keepAlive + "ms");
+ requestBody.put("pit", pit);
+
+ // Add sort for search_after
+ List<Map<String, String>> sort = new ArrayList<>();
+ sort.add(Collections.singletonMap("_shard_doc", "asc"));
+ requestBody.put("sort", sort);
+
+ // Add search_after if provided
+ if (searchAfter != null && searchAfter.length > 0) {
+ requestBody.put("search_after", searchAfter);
+ }
+
+ String endpoint = "/_search";
+ Request request = new Request("POST", endpoint);
+ request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.SEARCH_WITH_PIT_FAILED,
+ "POST " + endpoint + " response null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ return parsePointInTimeResponse(entity, pitId);
+ } else {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.SEARCH_WITH_PIT_FAILED,
+ String.format(
+ "POST %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ElasticsearchConnectorException(
+ ElasticsearchConnectorErrorCode.SEARCH_WITH_PIT_FAILED,
ex);
+ }
+ }
+
+ /**
+ * Parses the response from a Point-in-Time search.
+ *
+ * @param responseJson The JSON response from Elasticsearch
+ * @param pitId The PIT ID used for the search
+ * @return The parsed search results
+ */
+ private PointInTimeResult parsePointInTimeResponse(String responseJson,
String pitId) {
+ JsonNode rootNode = JsonUtils.parseObject(responseJson);
+ JsonNode hitsNode = rootNode.get("hits");
+ JsonNode totalNode = hitsNode.get("total");
+ long totalHits = totalNode.get("value").asLong();
+
+ List<Map<String, Object>> docs = new ArrayList<>();
+ JsonNode hitsArray = hitsNode.get("hits");
+ Object[] searchAfter = null;
+
+ for (JsonNode hit : hitsArray) {
+ Map<String, Object> doc = new HashMap<>();
+ // Add metadata fields
+ doc.put("_index", hit.get("_index").textValue());
+ doc.put("_id", hit.get("_id").textValue());
+ if (hit.has("_type")) {
+ doc.put("_type", hit.get("_type").textValue());
+ }
+
+ // Extract document source fields
+ JsonNode source = hit.get("_source");
+ for (Iterator<Map.Entry<String, JsonNode>> iterator =
source.fields();
+ iterator.hasNext(); ) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ String fieldName = entry.getKey();
+ if (entry.getValue() instanceof TextNode) {
+ doc.put(fieldName, entry.getValue().textValue());
+ } else {
+ doc.put(fieldName, entry.getValue());
+ }
+ }
+ docs.add(doc);
+
+ // Get sort values from the last document for search_after
+ if (hit.has("sort")) {
+ searchAfter = new Object[hit.get("sort").size()];
+ for (int i = 0; i < searchAfter.length; i++) {
+ JsonNode sortValue = hit.get("sort").get(i);
+ if (sortValue.isNumber()) {
+ searchAfter[i] = sortValue.asDouble();
+ } else if (sortValue.isTextual()) {
+ searchAfter[i] = sortValue.asText();
+ } else {
+ searchAfter[i] = sortValue.toString();
+ }
+ }
+ }
+ }
+
+ // Get the updated PIT ID
+ String updatedPitId = rootNode.has("pit_id") ?
rootNode.get("pit_id").asText() : pitId;
+
+ // Determine if there are more results
+ boolean hasMore = docs.size() > 0 && totalHits > 0 && docs.size() <
totalHits;
+
+ return new PointInTimeResult(updatedPitId, docs, totalHits,
searchAfter, hasMore);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
index d1ee2d627c..1d95990d82 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
@@ -38,8 +38,14 @@ public class ElasticsearchConfig implements Serializable {
private String scrollTime;
private int scrollSize;
private SearchTypeEnum searchType;
+ private SearchApiTypeEnum searchApiType;
private String sqlQuery;
+ private long pitKeepAlive;
+ private int pitBatchSize;
+ private String pitId;
+ private Object[] searchAfter;
+
private CatalogTable catalogTable;
public ElasticsearchConfig clone() {
@@ -51,7 +57,13 @@ public class ElasticsearchConfig implements Serializable {
elasticsearchConfig.setScrollSize(scrollSize);
elasticsearchConfig.setCatalogTable(catalogTable);
elasticsearchConfig.setSearchType(searchType);
+ elasticsearchConfig.setSearchApiType(searchApiType);
elasticsearchConfig.setSqlQuery(sqlQuery);
+ elasticsearchConfig.setPitKeepAlive(pitKeepAlive);
+ elasticsearchConfig.setPitBatchSize(pitBatchSize);
+ elasticsearchConfig.setPitId(pitId);
+ elasticsearchConfig.setSearchAfter(searchAfter != null ?
searchAfter.clone() : null);
+
return elasticsearchConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
index bfd671dde5..d7810e1a77 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
@Getter
@Setter
@@ -65,7 +66,14 @@ public class ElasticsearchSourceOptions extends
ElasticsearchBaseOptions {
Options.key("search_type")
.enumType(SearchTypeEnum.class)
.defaultValue(SearchTypeEnum.DSL)
- .withDescription("Choose dsl syntax or x-pack sql.");
+ .withDescription("Choose query type: DSL (Domain Specific
Language) or SQL.");
+
+ public static final Option<SearchApiTypeEnum> SEARCH_API_TYPE =
+ Options.key("search_api_type")
+ .enumType(SearchApiTypeEnum.class)
+ .defaultValue(SearchApiTypeEnum.SCROLL)
+ .withDescription(
+ "Choose API type for pagination: SCROLL or PIT
(Point in Time).");
public static final Option<String> SQL_QUERY =
Options.key("sql_query")
@@ -87,4 +95,18 @@ public class ElasticsearchSourceOptions extends
ElasticsearchBaseOptions {
Collections.singletonMap("match_all", new
HashMap<String, String>()))
.withDescription(
"Elasticsearch query language. You can control the
range of data read");
+
+ public static final Option<Long> PIT_KEEP_ALIVE =
+ Options.key("pit_keep_alive")
+ .longType()
+ .defaultValue(TimeUnit.MINUTES.toMillis(1)) // 1 minute in
milliseconds
+ .withDescription(
+ "The amount of time (in milliseconds) for which
the PIT should be kept alive. Default is 1 minute.");
+
+ public static final Option<Integer> PIT_BATCH_SIZE =
+ Options.key("pit_batch_size")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "Maximum number of hits to be returned with each
PIT search request. Similar to scroll_size but for PIT API.");
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchApiTypeEnum.java
similarity index 85%
copy from
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
copy to
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchApiTypeEnum.java
index 5cfe00b84e..c94427a6da 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchApiTypeEnum.java
@@ -17,7 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
-public enum SearchTypeEnum {
- DSL,
- SQL
+public enum SearchApiTypeEnum {
+ /** Use Scroll API for pagination */
+ SCROLL,
+
+ /** Use Point-in-Time (PIT) API for pagination */
+ PIT
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
index 5cfe00b84e..f4cd178ad1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
@@ -18,6 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
public enum SearchTypeEnum {
+ /** Use Domain Specific Language (DSL) query */
DSL,
+
+ /** Use SQL query */
SQL
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/PointInTimeResult.java
similarity index 54%
copy from
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
copy to
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/PointInTimeResult.java
index 5cfe00b84e..d999ff68ce 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/PointInTimeResult.java
@@ -15,9 +15,33 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source;
-public enum SearchTypeEnum {
- DSL,
- SQL
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+/** DTO for Elasticsearch Point-in-Time search results. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PointInTimeResult {
+
+ /** The PIT ID used for this search */
+ private String pitId;
+
+ /** Documents returned by the search */
+ private List<Map<String, Object>> docs;
+
+ /** Total number of hits matching the query */
+ private long totalHits;
+
+ /** Sort values of the last document, used for pagination with
search_after */
+ private Object[] searchAfter;
+
+ /** Whether there are more results to fetch */
+ private boolean hasMore;
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 5343e8ebc7..b804434b5a 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -35,8 +35,11 @@ public enum ElasticsearchConnectorErrorCode implements
SeaTunnelErrorCode {
"ELASTICSEARCH-11",
"'index' or 'index_list' must be configured, with at least one
being required."),
SOURCE_CONFIG_ERROR_02("ELASTICSEARCH-12", "'query' must be configured."),
- ADD_FIELD_FAILED("ELASTICSEARCH-12", "Field add failed"),
- SCHEMA_CHANGE_FAILED("ELASTICSEARCH-13", "Schema change failed"),
+ ADD_FIELD_FAILED("ELASTICSEARCH-13", "Field add failed"),
+ SCHEMA_CHANGE_FAILED("ELASTICSEARCH-14", "Schema change failed"),
+ CREATE_PIT_FAILED("ELASTICSEARCH-15", "Create Point-in-Time failed"),
+ DELETE_PIT_FAILED("ELASTICSEARCH-16", "Delete Point-in-Time failed"),
+ SEARCH_WITH_PIT_FAILED("ELASTICSEARCH-17", "Search with Point-in-Time
failed"),
;
private final String code;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index fcc9fe59fd..281636245d 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -41,6 +41,7 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClie
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchApiTypeEnum;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchTypeEnum;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -56,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_API_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_TYPE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SQL_QUERY;
@@ -175,9 +177,14 @@ public class ElasticsearchSource
"");
}
SearchTypeEnum searchType = readonlyConfig.get(SEARCH_TYPE);
+ SearchApiTypeEnum searchApiType = readonlyConfig.get(SEARCH_API_TYPE);
String sqlQuery =
readonlyConfig.get(ElasticsearchSourceOptions.SQL_QUERY);
String scrollTime =
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_TIME);
int scrollSize =
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_SIZE);
+
+ long pitKeepAlive =
readonlyConfig.get(ElasticsearchSourceOptions.PIT_KEEP_ALIVE);
+ int pitBatchSize =
readonlyConfig.get(ElasticsearchSourceOptions.PIT_BATCH_SIZE);
+
ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
elasticsearchConfig.setSource(source);
elasticsearchConfig.setCatalogTable(catalogTable);
@@ -188,6 +195,10 @@ public class ElasticsearchSource
elasticsearchConfig.setCatalogTable(catalogTable);
elasticsearchConfig.setSqlQuery(sqlQuery);
elasticsearchConfig.setSearchType(searchType);
+ elasticsearchConfig.setSearchApiType(searchApiType);
+
+ elasticsearchConfig.setPitKeepAlive(pitKeepAlive);
+ elasticsearchConfig.setPitBatchSize(pitBatchSize);
return elasticsearchConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 5485e13fad..fb11f72faf 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -40,9 +40,13 @@ import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.Ela
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.INDEX_LIST;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.PIT_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.PIT_KEEP_ALIVE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.QUERY;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_TIME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_API_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_TYPE;
@AutoService(Factory.class)
public class ElasticsearchSourceFactory implements TableSourceFactory {
@@ -63,6 +67,10 @@ public class ElasticsearchSourceFactory implements
TableSourceFactory {
SCROLL_TIME,
SCROLL_SIZE,
QUERY,
+ PIT_KEEP_ALIVE,
+ PIT_BATCH_SIZE,
+ SEARCH_API_TYPE,
+ SEARCH_TYPE,
TLS_VERIFY_CERTIFICATE,
TLS_VERIFY_HOSTNAME,
TLS_KEY_STORE_PATH,
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index 7858dd3522..1971002c3d 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -24,7 +24,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchApiTypeEnum;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchTypeEnum;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointInTimeResult;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord;
@@ -96,8 +98,10 @@ public class ElasticsearchSourceReader
SeaTunnelRowDeserializer deserializer =
new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
- // sql client
+
+ // SQL client
if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
+ log.info("Using SQL query for index: {}",
sourceIndexInfo.getIndex());
ScrollResult scrollResult =
esRestClient.searchBySql(
sourceIndexInfo.getSqlQuery(),
sourceIndexInfo.getScrollSize());
@@ -110,19 +114,93 @@ public class ElasticsearchSourceReader
outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
}
} else {
- ScrollResult scrollResult =
- esRestClient.searchByScroll(
- sourceIndexInfo.getIndex(),
+ // Check if we should use PIT API
+ if
(SearchApiTypeEnum.PIT.equals(sourceIndexInfo.getSearchApiType())) {
+ log.info("Using Point-in-Time (PIT) API for index: {}",
sourceIndexInfo.getIndex());
+ searchWithPointInTime(sourceIndexInfo, output, deserializer);
+ } else {
+ log.info("Using Scroll API for index: {}",
sourceIndexInfo.getIndex());
+ ScrollResult scrollResult =
+ esRestClient.searchByScroll(
+ sourceIndexInfo.getIndex(),
+ sourceIndexInfo.getSource(),
+ sourceIndexInfo.getQuery(),
+ sourceIndexInfo.getScrollTime(),
+ sourceIndexInfo.getScrollSize());
+ outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
+ while (scrollResult.getDocs() != null &&
!scrollResult.getDocs().isEmpty()) {
+ scrollResult =
+ esRestClient.searchWithScrollId(
+ scrollResult.getScrollId(),
sourceIndexInfo.getScrollTime());
+ outputFromScrollResult(scrollResult, sourceIndexInfo,
output, deserializer);
+ }
+ }
+ }
+ }
+
+ /**
+ * Search using Point-in-Time API.
+ *
+ * @param sourceIndexInfo The Elasticsearch configuration
+ * @param output The collector to output rows
+ * @param deserializer The deserializer to convert Elasticsearch records
to SeaTunnel rows
+ */
+ private void searchWithPointInTime(
+ ElasticsearchConfig sourceIndexInfo,
+ Collector<SeaTunnelRow> output,
+ SeaTunnelRowDeserializer deserializer) {
+
+ // Create a PIT
+ String pitId =
+ esRestClient.createPointInTime(
+ sourceIndexInfo.getIndex(),
sourceIndexInfo.getPitKeepAlive());
+ sourceIndexInfo.setPitId(pitId);
+ log.info(
+ "Created Point-in-Time with ID: {} for index: {}",
+ pitId,
+ sourceIndexInfo.getIndex());
+
+ try {
+ // Initial search
+ PointInTimeResult pitResult =
+ esRestClient.searchWithPointInTime(
+ pitId,
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
- sourceIndexInfo.getScrollTime(),
- sourceIndexInfo.getScrollSize());
- outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
- while (scrollResult.getDocs() != null &&
!scrollResult.getDocs().isEmpty()) {
- scrollResult =
- esRestClient.searchWithScrollId(
- scrollResult.getScrollId(),
sourceIndexInfo.getScrollTime());
- outputFromScrollResult(scrollResult, sourceIndexInfo, output,
deserializer);
+ sourceIndexInfo.getPitBatchSize(),
+ null, // No search_after for first request
+ sourceIndexInfo.getPitKeepAlive());
+
+ // Output the results
+ outputFromPitResult(pitResult, sourceIndexInfo, output,
deserializer);
+
+ // Continue searching while there are more results
+ while (pitResult.isHasMore()) {
+ // Update the PIT ID and search_after values for the next
request
+ sourceIndexInfo.setPitId(pitResult.getPitId());
+ sourceIndexInfo.setSearchAfter(pitResult.getSearchAfter());
+
+ // Execute the next search
+ pitResult =
+ esRestClient.searchWithPointInTime(
+ sourceIndexInfo.getPitId(),
+ sourceIndexInfo.getSource(),
+ sourceIndexInfo.getQuery(),
+ sourceIndexInfo.getPitBatchSize(),
+ sourceIndexInfo.getSearchAfter(),
+ sourceIndexInfo.getPitKeepAlive());
+
+ // Output the results
+ outputFromPitResult(pitResult, sourceIndexInfo, output,
deserializer);
+ }
+ } finally {
+ // Always clean up the PIT when done
+ if (pitId != null) {
+ try {
+ esRestClient.deletePointInTime(pitId);
+ } catch (Exception e) {
+ log.warn("Failed to delete Point-in-Time with ID: " +
pitId, e);
+ }
}
}
}
@@ -141,6 +219,28 @@ public class ElasticsearchSourceReader
}
}
+ /**
+ * Output rows from a Point-in-Time search result.
+ *
+ * @param pitResult The Point-in-Time search result
+ * @param elasticsearchConfig The Elasticsearch configuration
+ * @param output The collector to output rows
+ * @param deserializer The deserializer to convert Elasticsearch records
to SeaTunnel rows
+ */
+ private void outputFromPitResult(
+ PointInTimeResult pitResult,
+ ElasticsearchConfig elasticsearchConfig,
+ Collector<SeaTunnelRow> output,
+ SeaTunnelRowDeserializer deserializer) {
+ List<String> source = elasticsearchConfig.getSource();
+ String tableId =
elasticsearchConfig.getCatalogTable().getTablePath().toString();
+ for (Map<String, Object> doc : pitResult.getDocs()) {
+ SeaTunnelRow seaTunnelRow =
+ deserializer.deserialize(new ElasticsearchRecord(doc,
source, tableId));
+ output.collect(seaTunnelRow);
+ }
+ }
+
@Override
public List<ElasticsearchSourceSplit> snapshotState(long checkpointId)
throws Exception {
return new ArrayList<>(splits);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 61ba8b9093..37de01a474 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -316,6 +316,17 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
+ @TestTemplate
+ public void testElasticsearchWithPIT(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/elasticsearch/elasticsearch_source_with_pit.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> sinkData = readSinkDataWithSchema("st_index_pit");
+ // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+ Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
+ }
+
@TestTemplate
public void testElasticsearchWithNestSchema(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf
new file mode 100644
index 0000000000..fd52a4aeda
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of using PIT API in Elasticsearch
connector
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_index"
+ query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+
+ # Use DSL query with PIT API
+ search_type = "DSL"
+ search_api_type = "PIT"
+ pit_keep_alive = 60000 # 1 minute in milliseconds
+ pit_batch_size = 100
+
+ schema = {
+ fields {
+ c_map = "map<string, tinyint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_int = int
+ c_date = date
+ c_timestamp = timestamp
+ c_null = "null"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Elasticsearch {
+ hosts = ["https://elasticsearch:9200"]
+ username = "elastic"
+ password = "elasticsearch"
+ tls_verify_certificate = false
+ tls_verify_hostname = false
+
+ index = "st_index_pit"
+ index_type = "st"
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ }
+}