This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 948d588d06 [Feature][connector-elasticsearch] elasticsearch source 
support PIT (#9150)
948d588d06 is described below

commit 948d588d06c10d8def5d740ff164cadb380aee66
Author: CosmosNi <[email protected]>
AuthorDate: Wed Apr 16 09:32:55 2025 +0800

    [Feature][connector-elasticsearch] elasticsearch source support PIT (#9150)
---
 docs/en/connector-v2/source/Elasticsearch.md       |  51 +++++-
 docs/zh/connector-v2/source/Elasticsearch.md       |  51 +++++-
 .../elasticsearch/client/EsRestClient.java         | 204 +++++++++++++++++++++
 .../elasticsearch/config/ElasticsearchConfig.java  |  12 ++
 .../config/ElasticsearchSourceOptions.java         |  24 ++-
 ...{SearchTypeEnum.java => SearchApiTypeEnum.java} |   9 +-
 .../elasticsearch/config/SearchTypeEnum.java       |   3 +
 .../source/PointInTimeResult.java}                 |  32 +++-
 .../exception/ElasticsearchConnectorErrorCode.java |   7 +-
 .../elasticsearch/source/ElasticsearchSource.java  |  11 ++
 .../source/ElasticsearchSourceFactory.java         |   8 +
 .../source/ElasticsearchSourceReader.java          | 124 +++++++++++--
 .../connector/elasticsearch/ElasticsearchIT.java   |  11 ++
 .../elasticsearch_source_with_pit.conf             |  82 +++++++++
 14 files changed, 595 insertions(+), 34 deletions(-)

diff --git a/docs/en/connector-v2/source/Elasticsearch.md 
b/docs/en/connector-v2/source/Elasticsearch.md
index 92299b7b2d..dff046767b 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -30,8 +30,9 @@ support version >= 2.x and <= 8.x.
 | index_list              | array   | no       | used to define a multiple 
table task                           |
 | source                  | array   | no       | -                             
                                 |
 | query                   | json    | no       | {"match_all": {}}             
                                 |
-| search_type             | json    | no       | Search method,sql or 
dsl,default dsl                           |
-| sql_query               | json    | no       | sql query                     
                                 |
+| search_type             | enum    | no       | Query type, SQL or DSL, 
default DSL                            |
+| search_api_type         | enum    | no       | Pagination API type, SCROLL 
or PIT, default SCROLL             |
+| sql_query               | json    | no       | SQL query, required when 
search_type is SQL                    |
 | scroll_time             | string  | no       | 1m                            
                                 |
 | scroll_size             | int     | no       | 100                           
                                 |
 | tls_verify_certificate  | boolean | no       | true                          
                                 |
@@ -41,6 +42,8 @@ support version >= 2.x and <= 8.x.
 | tls_keystore_password   | string  | no       | -                             
                                 |
 | tls_truststore_path     | string  | no       | -                             
                                 |
 | tls_truststore_password | string  | no       | -                             
                                 |
+| pit_keep_alive          | long    | no       | 60000 (1 minute)              
                                 |
+| pit_batch_size          | int     | no       | 100                           
                                 |
 | common-options          |         | no       | -                             
                                 |
 
 
@@ -113,6 +116,22 @@ The path to PEM or JKS trust store. This file must be 
readable by the operating
 
 The key password for the trust store specified
 
+### search_type
+Query type, available values:
+- DSL: Use Domain Specific Language query (default)
+- SQL: Use SQL query
+
+### search_api_type
+Pagination API type, available values:
+- SCROLL: Use Scroll API for pagination (default)
+- PIT: Use Point in Time (PIT) API for pagination
+
+### pit_keep_alive [long]
+The amount of time (in milliseconds) for which the PIT should be keep alive
+
+### pit_batch_size  [long]
+Maximum number of hits to be returned with each PIT search request
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details
@@ -177,7 +196,7 @@ source {
            c_date2,
            c_null
            ]
-           
+
        }
 
     ]
@@ -214,7 +233,7 @@ source {
         hosts = ["https://localhost:9200";]
         username = "elastic"
         password = "elasticsearch"
-        
+
         tls_verify_certificate = false
     }
 }
@@ -228,7 +247,7 @@ source {
         hosts = ["https://localhost:9200";]
         username = "elastic"
         password = "elasticsearch"
-        
+
         tls_verify_hostname = false
     }
 }
@@ -242,7 +261,7 @@ source {
         hosts = ["https://localhost:9200";]
         username = "elastic"
         password = "elasticsearch"
-        
+
         tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
         tls_keystore_password = "${your password}"
     }
@@ -266,6 +285,26 @@ source {
 }
 ```
 
+Demo7:  PIT
+```hocon
+source {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_index"
+    query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+
+    # Use DSL query with PIT API
+    search_type = DSL
+    search_api_type = PIT
+    pit_keep_alive = 60000  # 1 minute in milliseconds
+    pit_batch_size = 100
+```
+
 ## Changelog
 
 <ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/Elasticsearch.md 
b/docs/zh/connector-v2/source/Elasticsearch.md
index 9c9c6dd3cc..ae6ed421bd 100644
--- a/docs/zh/connector-v2/source/Elasticsearch.md
+++ b/docs/zh/connector-v2/source/Elasticsearch.md
@@ -28,8 +28,9 @@ import ChangeLog from 
'../changelog/connector-elasticsearch.md';
 | index_list              | array   | no       | 用来定义多索引同步任务                   
      |
 | source                  | array   | no       | -                             
      |
 | query                   | json    | no       | {"match_all": {}}             
      |
-| search_type             | json    | no       | 查询方式,sql或者dsl,默认 dsl          
      |
-| sql_query               | json    | no       | sql 查询语句                      
      |
+| search_type             | enum    | no       | 查询类型,SQL 或 DSL,默认 DSL         
     |
+| search_api_type         | enum    | no       | 分页 API 类型,SCROLL 或 PIT,默认 
SCROLL    |
+| sql_query               | json    | no       | SQL 查询语句,当 search_type 为 SQL 
时必须    |
 | scroll_time             | string  | no       | 1m                            
      |
 | scroll_size             | int     | no       | 100                           
      |
 | tls_verify_certificate  | boolean | no       | true                          
      |
@@ -39,6 +40,8 @@ import ChangeLog from 
'../changelog/connector-elasticsearch.md';
 | tls_keystore_password   | string  | no       | -                             
      |
 | tls_truststore_path     | string  | no       | -                             
      |
 | tls_truststore_password | string  | no       | -                             
      |
+| pit_keep_alive          | long    | no       | 60000 (1 minute)              
      |
+| pit_batch_size          | int     | no       | 100                           
      |
 | common-options          |         | no       | -                             
      |
 
 ### hosts [array]
@@ -115,6 +118,22 @@ PEM 或 JKS 信任库的路径。该文件必须对运行 SeaTunnel 的操作系
 
 指定信任库的密钥密码。
 
+### search_type
+查询类型,可选值:
+- DSL: 使用 Domain Specific Language 查询(默认)
+- SQL: 使用 SQL 查询
+
+### search_api_type
+分页 API 类型,可选值:
+- SCROLL: 使用 Scroll API 进行分页(默认)
+- PIT: 使用 Point in Time (PIT) API 进行分页
+
+### pit_keep_alive [long]
+PIT 应保持活动的时间量(以毫秒为单位)
+
+### pit_batch_size  [long]
+每次 PIT 搜索请求返回的最大数量
+
 ### common options
 
 Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md)
@@ -180,7 +199,7 @@ source {
            c_date2,
            c_null
            ]
-           
+
        }
 
     ]
@@ -215,7 +234,7 @@ source {
         hosts = ["https://localhost:9200";]
         username = "elastic"
         password = "elasticsearch"
-        
+
         tls_verify_certificate = false
     }
 }
@@ -229,7 +248,7 @@ source {
         hosts = ["https://localhost:9200";]
         username = "elastic"
         password = "elasticsearch"
-        
+
         tls_verify_hostname = false
     }
 }
@@ -243,7 +262,7 @@ source {
         hosts = ["https://localhost:9200";]
         username = "elastic"
         password = "elasticsearch"
-        
+
         tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
         tls_keystore_password = "${your password}"
     }
@@ -267,6 +286,26 @@ source {
 }
 ```
 
+Demo7:  PIT方式滚动查询
+```hocon
+source {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_index"
+    query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+
+    # 使用 DSL 查询和 PIT API
+    search_type = DSL
+    search_api_type = PIT
+    pit_keep_alive = 60000  # 1 minute in milliseconds
+    pit_batch_size = 100
+```
+
 ## 变更日志
 
 <ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index b2be5dac29..69c34faef6 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.Elasticsea
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointInTimeResult;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -61,6 +62,7 @@ import javax.net.ssl.SSLContext;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -876,4 +878,206 @@ public class EsRestClient implements Closeable {
                     ex);
         }
     }
+
+    /**
+     * Creates a Point-in-Time (PIT) for the specified index.
+     *
+     * @param index The index to create a PIT for
+     * @param keepAlive The time to keep the PIT alive (in milliseconds)
+     * @return The PIT ID
+     */
+    public String createPointInTime(String index, long keepAlive) {
+        String endpoint = String.format("/%s/_pit?keep_alive=%dms", 
index.toLowerCase(), keepAlive);
+        Request request = new Request("POST", endpoint);
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new ElasticsearchConnectorException(
+                        ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED,
+                        "POST " + endpoint + " response null");
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String entity = EntityUtils.toString(response.getEntity());
+                JsonNode jsonNode = JsonUtils.parseObject(entity);
+                return jsonNode.get("id").asText();
+            } else {
+                throw new ElasticsearchConnectorException(
+                        ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED,
+                        String.format(
+                                "POST %s response status code=%d",
+                                endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new ElasticsearchConnectorException(
+                    ElasticsearchConnectorErrorCode.CREATE_PIT_FAILED, ex);
+        }
+    }
+
+    /**
+     * Deletes a Point-in-Time (PIT).
+     *
+     * @param pitId The PIT ID to delete
+     * @return True if the PIT was successfully deleted
+     */
+    public boolean deletePointInTime(String pitId) {
+        String endpoint = "/_pit";
+        Request request = new Request("DELETE", endpoint);
+        Map<String, String> requestBody = new HashMap<>();
+        requestBody.put("id", pitId);
+        request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new ElasticsearchConnectorException(
+                        ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED,
+                        "DELETE " + endpoint + " response null");
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String entity = EntityUtils.toString(response.getEntity());
+                JsonNode jsonNode = JsonUtils.parseObject(entity);
+                return jsonNode.get("succeeded").asBoolean();
+            } else {
+                throw new ElasticsearchConnectorException(
+                        ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED,
+                        String.format(
+                                "DELETE %s response status code=%d",
+                                endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new ElasticsearchConnectorException(
+                    ElasticsearchConnectorErrorCode.DELETE_PIT_FAILED, ex);
+        }
+    }
+
+    /**
+     * Searches using a Point-in-Time (PIT).
+     *
+     * @param pitId The PIT ID to use
+     * @param source The fields to include in the response
+     * @param query The query to execute
+     * @param batchSize The number of documents to return
+     * @param searchAfter The sort values to search after (for pagination)
+     * @param keepAlive The time to keep the PIT alive (in milliseconds)
+     * @return The search results
+     */
+    public PointInTimeResult searchWithPointInTime(
+            String pitId,
+            List<String> source,
+            Map<String, Object> query,
+            int batchSize,
+            Object[] searchAfter,
+            long keepAlive) {
+
+        Map<String, Object> requestBody = new HashMap<>();
+        requestBody.put("size", batchSize);
+        requestBody.put("query", query);
+        requestBody.put("_source", source);
+
+        // Add PIT information
+        Map<String, Object> pit = new HashMap<>();
+        pit.put("id", pitId);
+        pit.put("keep_alive", keepAlive + "ms");
+        requestBody.put("pit", pit);
+
+        // Add sort for search_after
+        List<Map<String, String>> sort = new ArrayList<>();
+        sort.add(Collections.singletonMap("_shard_doc", "asc"));
+        requestBody.put("sort", sort);
+
+        // Add search_after if provided
+        if (searchAfter != null && searchAfter.length > 0) {
+            requestBody.put("search_after", searchAfter);
+        }
+
+        String endpoint = "/_search";
+        Request request = new Request("POST", endpoint);
+        request.setJsonEntity(JsonUtils.toJsonString(requestBody));
+
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new ElasticsearchConnectorException(
+                        ElasticsearchConnectorErrorCode.SEARCH_WITH_PIT_FAILED,
+                        "POST " + endpoint + " response null");
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String entity = EntityUtils.toString(response.getEntity());
+                return parsePointInTimeResponse(entity, pitId);
+            } else {
+                throw new ElasticsearchConnectorException(
+                        ElasticsearchConnectorErrorCode.SEARCH_WITH_PIT_FAILED,
+                        String.format(
+                                "POST %s response status code=%d",
+                                endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new ElasticsearchConnectorException(
+                    ElasticsearchConnectorErrorCode.SEARCH_WITH_PIT_FAILED, 
ex);
+        }
+    }
+
+    /**
+     * Parses the response from a Point-in-Time search.
+     *
+     * @param responseJson The JSON response from Elasticsearch
+     * @param pitId The PIT ID used for the search
+     * @return The parsed search results
+     */
+    private PointInTimeResult parsePointInTimeResponse(String responseJson, 
String pitId) {
+        JsonNode rootNode = JsonUtils.parseObject(responseJson);
+        JsonNode hitsNode = rootNode.get("hits");
+        JsonNode totalNode = hitsNode.get("total");
+        long totalHits = totalNode.get("value").asLong();
+
+        List<Map<String, Object>> docs = new ArrayList<>();
+        JsonNode hitsArray = hitsNode.get("hits");
+        Object[] searchAfter = null;
+
+        for (JsonNode hit : hitsArray) {
+            Map<String, Object> doc = new HashMap<>();
+            // Add metadata fields
+            doc.put("_index", hit.get("_index").textValue());
+            doc.put("_id", hit.get("_id").textValue());
+            if (hit.has("_type")) {
+                doc.put("_type", hit.get("_type").textValue());
+            }
+
+            // Extract document source fields
+            JsonNode source = hit.get("_source");
+            for (Iterator<Map.Entry<String, JsonNode>> iterator = 
source.fields();
+                    iterator.hasNext(); ) {
+                Map.Entry<String, JsonNode> entry = iterator.next();
+                String fieldName = entry.getKey();
+                if (entry.getValue() instanceof TextNode) {
+                    doc.put(fieldName, entry.getValue().textValue());
+                } else {
+                    doc.put(fieldName, entry.getValue());
+                }
+            }
+            docs.add(doc);
+
+            // Get sort values from the last document for search_after
+            if (hit.has("sort")) {
+                searchAfter = new Object[hit.get("sort").size()];
+                for (int i = 0; i < searchAfter.length; i++) {
+                    JsonNode sortValue = hit.get("sort").get(i);
+                    if (sortValue.isNumber()) {
+                        searchAfter[i] = sortValue.asDouble();
+                    } else if (sortValue.isTextual()) {
+                        searchAfter[i] = sortValue.asText();
+                    } else {
+                        searchAfter[i] = sortValue.toString();
+                    }
+                }
+            }
+        }
+
+        // Get the updated PIT ID
+        String updatedPitId = rootNode.has("pit_id") ? 
rootNode.get("pit_id").asText() : pitId;
+
+        // Determine if there are more results
+        boolean hasMore = docs.size() > 0 && totalHits > 0 && docs.size() < 
totalHits;
+
+        return new PointInTimeResult(updatedPitId, docs, totalHits, 
searchAfter, hasMore);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
index d1ee2d627c..1d95990d82 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java
@@ -38,8 +38,14 @@ public class ElasticsearchConfig implements Serializable {
     private String scrollTime;
     private int scrollSize;
     private SearchTypeEnum searchType;
+    private SearchApiTypeEnum searchApiType;
     private String sqlQuery;
 
+    private long pitKeepAlive;
+    private int pitBatchSize;
+    private String pitId;
+    private Object[] searchAfter;
+
     private CatalogTable catalogTable;
 
     public ElasticsearchConfig clone() {
@@ -51,7 +57,13 @@ public class ElasticsearchConfig implements Serializable {
         elasticsearchConfig.setScrollSize(scrollSize);
         elasticsearchConfig.setCatalogTable(catalogTable);
         elasticsearchConfig.setSearchType(searchType);
+        elasticsearchConfig.setSearchApiType(searchApiType);
         elasticsearchConfig.setSqlQuery(sqlQuery);
+        elasticsearchConfig.setPitKeepAlive(pitKeepAlive);
+        elasticsearchConfig.setPitBatchSize(pitBatchSize);
+        elasticsearchConfig.setPitId(pitId);
+        elasticsearchConfig.setSearchAfter(searchAfter != null ? 
searchAfter.clone() : null);
+
         return elasticsearchConfig;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
index bfd671dde5..d7810e1a77 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 @Getter
 @Setter
@@ -65,7 +66,14 @@ public class ElasticsearchSourceOptions extends 
ElasticsearchBaseOptions {
             Options.key("search_type")
                     .enumType(SearchTypeEnum.class)
                     .defaultValue(SearchTypeEnum.DSL)
-                    .withDescription("Choose dsl syntax or x-pack sql.");
+                    .withDescription("Choose query type: DSL (Domain Specific 
Language) or SQL.");
+
+    public static final Option<SearchApiTypeEnum> SEARCH_API_TYPE =
+            Options.key("search_api_type")
+                    .enumType(SearchApiTypeEnum.class)
+                    .defaultValue(SearchApiTypeEnum.SCROLL)
+                    .withDescription(
+                            "Choose API type for pagination: SCROLL or PIT 
(Point in Time).");
 
     public static final Option<String> SQL_QUERY =
             Options.key("sql_query")
@@ -87,4 +95,18 @@ public class ElasticsearchSourceOptions extends 
ElasticsearchBaseOptions {
                             Collections.singletonMap("match_all", new 
HashMap<String, String>()))
                     .withDescription(
                             "Elasticsearch query language. You can control the 
range of data read");
+
+    public static final Option<Long> PIT_KEEP_ALIVE =
+            Options.key("pit_keep_alive")
+                    .longType()
+                    .defaultValue(TimeUnit.MINUTES.toMillis(1)) // 1 minute in 
milliseconds
+                    .withDescription(
+                            "The amount of time (in milliseconds) for which 
the PIT should be kept alive. Default is 1 minute.");
+
+    public static final Option<Integer> PIT_BATCH_SIZE =
+            Options.key("pit_batch_size")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "Maximum number of hits to be returned with each 
PIT search request. Similar to scroll_size but for PIT API.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchApiTypeEnum.java
similarity index 85%
copy from 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
copy to 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchApiTypeEnum.java
index 5cfe00b84e..c94427a6da 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchApiTypeEnum.java
@@ -17,7 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 
-public enum SearchTypeEnum {
-    DSL,
-    SQL
+public enum SearchApiTypeEnum {
+    /** Use Scroll API for pagination */
+    SCROLL,
+
+    /** Use Point-in-Time (PIT) API for pagination */
+    PIT
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
index 5cfe00b84e..f4cd178ad1 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
@@ -18,6 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
 
 public enum SearchTypeEnum {
+    /** Use Domain Specific Language (DSL) query */
     DSL,
+
+    /** Use SQL query */
     SQL
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/PointInTimeResult.java
similarity index 54%
copy from 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
copy to 
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/PointInTimeResult.java
index 5cfe00b84e..d999ff68ce 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/source/PointInTimeResult.java
@@ -15,9 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source;
 
-public enum SearchTypeEnum {
-    DSL,
-    SQL
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+/** DTO for Elasticsearch Point-in-Time search results. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PointInTimeResult {
+
+    /** The PIT ID used for this search */
+    private String pitId;
+
+    /** Documents returned by the search */
+    private List<Map<String, Object>> docs;
+
+    /** Total number of hits matching the query */
+    private long totalHits;
+
+    /** Sort values of the last document, used for pagination with 
search_after */
+    private Object[] searchAfter;
+
+    /** Whether there are more results to fetch */
+    private boolean hasMore;
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 5343e8ebc7..b804434b5a 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -35,8 +35,11 @@ public enum ElasticsearchConnectorErrorCode implements 
SeaTunnelErrorCode {
             "ELASTICSEARCH-11",
             "'index' or 'index_list' must be configured, with at least one 
being required."),
     SOURCE_CONFIG_ERROR_02("ELASTICSEARCH-12", "'query' must be configured."),
-    ADD_FIELD_FAILED("ELASTICSEARCH-12", "Field add failed"),
-    SCHEMA_CHANGE_FAILED("ELASTICSEARCH-13", "Schema change failed"),
+    ADD_FIELD_FAILED("ELASTICSEARCH-13", "Field add failed"),
+    SCHEMA_CHANGE_FAILED("ELASTICSEARCH-14", "Schema change failed"),
+    CREATE_PIT_FAILED("ELASTICSEARCH-15", "Create Point-in-Time failed"),
+    DELETE_PIT_FAILED("ELASTICSEARCH-16", "Delete Point-in-Time failed"),
+    SEARCH_WITH_PIT_FAILED("ELASTICSEARCH-17", "Search with Point-in-Time 
failed"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index fcc9fe59fd..281636245d 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -41,6 +41,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClie
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchApiTypeEnum;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchTypeEnum;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
@@ -56,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_API_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SQL_QUERY;
 
@@ -175,9 +177,14 @@ public class ElasticsearchSource
                             "");
         }
         SearchTypeEnum searchType = readonlyConfig.get(SEARCH_TYPE);
+        SearchApiTypeEnum searchApiType = readonlyConfig.get(SEARCH_API_TYPE);
         String sqlQuery = 
readonlyConfig.get(ElasticsearchSourceOptions.SQL_QUERY);
         String scrollTime = 
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_TIME);
         int scrollSize = 
readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_SIZE);
+
+        long pitKeepAlive = 
readonlyConfig.get(ElasticsearchSourceOptions.PIT_KEEP_ALIVE);
+        int pitBatchSize = 
readonlyConfig.get(ElasticsearchSourceOptions.PIT_BATCH_SIZE);
+
         ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
         elasticsearchConfig.setSource(source);
         elasticsearchConfig.setCatalogTable(catalogTable);
@@ -188,6 +195,10 @@ public class ElasticsearchSource
         elasticsearchConfig.setCatalogTable(catalogTable);
         elasticsearchConfig.setSqlQuery(sqlQuery);
         elasticsearchConfig.setSearchType(searchType);
+        elasticsearchConfig.setSearchApiType(searchApiType);
+
+        elasticsearchConfig.setPitKeepAlive(pitKeepAlive);
+        elasticsearchConfig.setPitBatchSize(pitBatchSize);
         return elasticsearchConfig;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
index 5485e13fad..fb11f72faf 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -40,9 +40,13 @@ import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.Ela
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.INDEX_LIST;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.PIT_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.PIT_KEEP_ALIVE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.QUERY;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SCROLL_TIME;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_API_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_TYPE;
 
 @AutoService(Factory.class)
 public class ElasticsearchSourceFactory implements TableSourceFactory {
@@ -63,6 +67,10 @@ public class ElasticsearchSourceFactory implements 
TableSourceFactory {
                         SCROLL_TIME,
                         SCROLL_SIZE,
                         QUERY,
+                        PIT_KEEP_ALIVE,
+                        PIT_BATCH_SIZE,
+                        SEARCH_API_TYPE,
+                        SEARCH_TYPE,
                         TLS_VERIFY_CERTIFICATE,
                         TLS_VERIFY_HOSTNAME,
                         TLS_KEY_STORE_PATH,
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
index 7858dd3522..1971002c3d 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java
@@ -24,7 +24,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchApiTypeEnum;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchTypeEnum;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointInTimeResult;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord;
@@ -96,8 +98,10 @@ public class ElasticsearchSourceReader
 
         SeaTunnelRowDeserializer deserializer =
                 new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
-        // sql client
+
+        // SQL client
         if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
+            log.info("Using SQL query for index: {}", 
sourceIndexInfo.getIndex());
             ScrollResult scrollResult =
                     esRestClient.searchBySql(
                             sourceIndexInfo.getSqlQuery(), 
sourceIndexInfo.getScrollSize());
@@ -110,19 +114,93 @@ public class ElasticsearchSourceReader
                 outputFromScrollResult(scrollResult, sourceIndexInfo, output, 
deserializer);
             }
         } else {
-            ScrollResult scrollResult =
-                    esRestClient.searchByScroll(
-                            sourceIndexInfo.getIndex(),
+            // Check if we should use PIT API
+            if 
(SearchApiTypeEnum.PIT.equals(sourceIndexInfo.getSearchApiType())) {
+                log.info("Using Point-in-Time (PIT) API for index: {}", 
sourceIndexInfo.getIndex());
+                searchWithPointInTime(sourceIndexInfo, output, deserializer);
+            } else {
+                log.info("Using Scroll API for index: {}", 
sourceIndexInfo.getIndex());
+                ScrollResult scrollResult =
+                        esRestClient.searchByScroll(
+                                sourceIndexInfo.getIndex(),
+                                sourceIndexInfo.getSource(),
+                                sourceIndexInfo.getQuery(),
+                                sourceIndexInfo.getScrollTime(),
+                                sourceIndexInfo.getScrollSize());
+                outputFromScrollResult(scrollResult, sourceIndexInfo, output, 
deserializer);
+                while (scrollResult.getDocs() != null && 
!scrollResult.getDocs().isEmpty()) {
+                    scrollResult =
+                            esRestClient.searchWithScrollId(
+                                    scrollResult.getScrollId(), 
sourceIndexInfo.getScrollTime());
+                    outputFromScrollResult(scrollResult, sourceIndexInfo, 
output, deserializer);
+                }
+            }
+        }
+    }
+
+    /**
+     * Search using Point-in-Time API.
+     *
+     * @param sourceIndexInfo The Elasticsearch configuration
+     * @param output The collector to output rows
+     * @param deserializer The deserializer to convert Elasticsearch records 
to SeaTunnel rows
+     */
+    private void searchWithPointInTime(
+            ElasticsearchConfig sourceIndexInfo,
+            Collector<SeaTunnelRow> output,
+            SeaTunnelRowDeserializer deserializer) {
+
+        // Create a PIT
+        String pitId =
+                esRestClient.createPointInTime(
+                        sourceIndexInfo.getIndex(), 
sourceIndexInfo.getPitKeepAlive());
+        sourceIndexInfo.setPitId(pitId);
+        log.info(
+                "Created Point-in-Time with ID: {} for index: {}",
+                pitId,
+                sourceIndexInfo.getIndex());
+
+        try {
+            // Initial search
+            PointInTimeResult pitResult =
+                    esRestClient.searchWithPointInTime(
+                            pitId,
                             sourceIndexInfo.getSource(),
                             sourceIndexInfo.getQuery(),
-                            sourceIndexInfo.getScrollTime(),
-                            sourceIndexInfo.getScrollSize());
-            outputFromScrollResult(scrollResult, sourceIndexInfo, output, 
deserializer);
-            while (scrollResult.getDocs() != null && 
!scrollResult.getDocs().isEmpty()) {
-                scrollResult =
-                        esRestClient.searchWithScrollId(
-                                scrollResult.getScrollId(), 
sourceIndexInfo.getScrollTime());
-                outputFromScrollResult(scrollResult, sourceIndexInfo, output, 
deserializer);
+                            sourceIndexInfo.getPitBatchSize(),
+                            null, // No search_after for first request
+                            sourceIndexInfo.getPitKeepAlive());
+
+            // Output the results
+            outputFromPitResult(pitResult, sourceIndexInfo, output, 
deserializer);
+
+            // Continue searching while there are more results
+            while (pitResult.isHasMore()) {
+                // Update the PIT ID and search_after values for the next 
request
+                sourceIndexInfo.setPitId(pitResult.getPitId());
+                sourceIndexInfo.setSearchAfter(pitResult.getSearchAfter());
+
+                // Execute the next search
+                pitResult =
+                        esRestClient.searchWithPointInTime(
+                                sourceIndexInfo.getPitId(),
+                                sourceIndexInfo.getSource(),
+                                sourceIndexInfo.getQuery(),
+                                sourceIndexInfo.getPitBatchSize(),
+                                sourceIndexInfo.getSearchAfter(),
+                                sourceIndexInfo.getPitKeepAlive());
+
+                // Output the results
+                outputFromPitResult(pitResult, sourceIndexInfo, output, 
deserializer);
+            }
+        } finally {
+            // Always clean up the PIT when done
+            if (pitId != null) {
+                try {
+                    esRestClient.deletePointInTime(pitId);
+                } catch (Exception e) {
+                    log.warn("Failed to delete Point-in-Time with ID: " + 
pitId, e);
+                }
             }
         }
     }
@@ -141,6 +219,28 @@ public class ElasticsearchSourceReader
         }
     }
 
+    /**
+     * Output rows from a Point-in-Time search result.
+     *
+     * @param pitResult The Point-in-Time search result
+     * @param elasticsearchConfig The Elasticsearch configuration
+     * @param output The collector to output rows
+     * @param deserializer The deserializer to convert Elasticsearch records 
to SeaTunnel rows
+     */
+    private void outputFromPitResult(
+            PointInTimeResult pitResult,
+            ElasticsearchConfig elasticsearchConfig,
+            Collector<SeaTunnelRow> output,
+            SeaTunnelRowDeserializer deserializer) {
+        List<String> source = elasticsearchConfig.getSource();
+        String tableId = 
elasticsearchConfig.getCatalogTable().getTablePath().toString();
+        for (Map<String, Object> doc : pitResult.getDocs()) {
+            SeaTunnelRow seaTunnelRow =
+                    deserializer.deserialize(new ElasticsearchRecord(doc, 
source, tableId));
+            output.collect(seaTunnelRow);
+        }
+    }
+
     @Override
     public List<ElasticsearchSourceSplit> snapshotState(long checkpointId) 
throws Exception {
         return new ArrayList<>(splits);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 61ba8b9093..37de01a474 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -316,6 +316,17 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
         Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
     }
 
+    @TestTemplate
+    public void testElasticsearchWithPIT(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/elasticsearch/elasticsearch_source_with_pit.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> sinkData = readSinkDataWithSchema("st_index_pit");
+        // for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
+        Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
+    }
+
     @TestTemplate
     public void testElasticsearchWithNestSchema(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf
new file mode 100644
index 0000000000..fd52a4aeda
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of using PIT API in Elasticsearch 
connector
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_index"
+    query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
+
+    # Use DSL query with PIT API
+    search_type = "DSL"
+    search_api_type = "PIT"
+    pit_keep_alive = 60000  # 1 minute in milliseconds
+    pit_batch_size = 100
+
+    schema = {
+      fields {
+        c_map = "map<string, tinyint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_int = int
+        c_date = date
+        c_timestamp = timestamp
+        c_null = "null"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Elasticsearch {
+    hosts = ["https://elasticsearch:9200";]
+    username = "elastic"
+    password = "elasticsearch"
+    tls_verify_certificate = false
+    tls_verify_hostname = false
+
+    index = "st_index_pit"
+    index_type = "st"
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+  }
+}

Reply via email to