This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 879b1e2d5b [Improve][Connector-V2][Http] Supports Cursor-based 
Pagination (#9109) (#9138)
879b1e2d5b is described below

commit 879b1e2d5bbe813413e643e1d6412d09de760e4d
Author: alberne wang <[email protected]>
AuthorDate: Tue Apr 15 16:30:22 2025 +0800

    [Improve][Connector-V2][Http] Supports Cursor-based Pagination (#9109) 
(#9138)
---
 docs/en/connector-v2/source/Http.md                | 103 +++++++++++-----
 .../{PageInfo.java => HttpPaginationType.java}     |  38 ++++--
 .../seatunnel/http/config/HttpSourceOptions.java   |  23 ++++
 .../connectors/seatunnel/http/config/PageInfo.java |   4 +
 .../seatunnel/http/source/HttpSource.java          |  12 ++
 .../seatunnel/http/source/HttpSourceReader.java    | 134 +++++++++++++++------
 .../seatunnel/e2e/connector/http/HttpIT.java       |   4 +
 .../resources/http_page_cursor_num_assert.conf     |  87 +++++++++++++
 .../src/test/resources/mockserver-config.json      |  55 +++++++++
 9 files changed, 386 insertions(+), 74 deletions(-)

diff --git a/docs/en/connector-v2/source/Http.md 
b/docs/en/connector-v2/source/Http.md
index f9d50458df..2183791af9 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -44,33 +44,37 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Source Options
 
-|            Name             |  Type   | Required | Default | Description     
                                                                                
                                                                                
  |
-|-----------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url                         | String  | Yes      | -       | Http request 
url.                                                                            
                                                                                
     |
-| schema                      | Config  | No       | -       | Http and 
seatunnel data structure mapping                                                
                                                                                
         |
-| schema.fields               | Config  | No       | -       | The schema 
fields of upstream data                                                         
                                                                                
       |
-| json_field                  | Config  | No       | -       | This parameter 
helps you configure the schema,so this parameter must be used with schema.      
                                                                                
   |
-| pageing                     | Config  | No       | -       | This parameter 
is used for paging queries                                                      
                                                                                
   |
-| pageing.page_field          | String  | No       | -       | This parameter 
is used to specify the page field name in the request parameter                 
                                                                                
   |
-| pageing.total_page_size     | Int     | No       | -       | This parameter 
is used to control the total number of pages                                    
                                                                                
   |
-| pageing.batch_size          | Int     | No       | -       | The batch size 
returned per request is used to determine whether to continue when the total 
number of pages is unknown                                                      
      |
-| pageing.start_page_number   | Int     | No       | 1       | Specify the 
page number from which synchronization starts                                   
                                                                                
      |
-| content_json                | String  | No       | -       | This parameter 
can get some json data.If you only need the data in the 'book' section, 
configure `content_field = "$.store.book.*"`.                                   
           |
-| format                      | String  | No       | text    | The format of 
upstream data, now only support `json` `text`, default `text`.                  
                                                                                
    |
-| method                      | String  | No       | get     | Http request 
method, only supports GET, POST method.                                         
                                                                                
     |
-| headers                     | Map     | No       | -       | Http headers.   
                                                                                
                                                                                
  |
-| params                      | Map     | No       | -       | Http params.    
                                                                                
                                                                                
  |
-| body                        | String  | No       | -       | Http body,the 
program will automatically add http header application/json,body is jsonbody.   
                                                                                
    |
-| poll_interval_millis        | Int     | No       | -       | Request http 
api interval(millis) in stream mode.                                            
                                                                                
     |
-| retry                       | Int     | No       | -       | The max retry 
times if request http return to `IOException`.                                  
                                                                                
    |
-| retry_backoff_multiplier_ms | Int     | No       | 100     | The 
retry-backoff times(millis) multiplier if request http failed.                  
                                                                                
              |
-| retry_backoff_max_ms        | Int     | No       | 10000   | The maximum 
retry-backoff times(millis) if request http failed                              
                                                                                
      |
-| enable_multi_lines          | Boolean | No       | false   |                 
                                                                                
                                                                                
  |
-| connect_timeout_ms          | Int     | No       | 12000   | Connection 
timeout setting, default 12s.                                                   
                                                                                
       |
-| socket_timeout_ms           | Int     | No       | 60000   | Socket timeout 
setting, default 60s.                                                           
                                                                                
   |
-| common-options              |         | No       | -       | Source plugin 
common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                  |
-| keep_params_as_form         |    Boolean     | No       | false       | 
Whether the params are submitted according to the form, used for compatibility 
with legacy behaviors. When true, the value of the params parameter is 
submitted through the form. |
-| keep_page_param_as_http_param         |    Boolean     | No       | false    
   | Whether to set the paging parameters to params. For compatibility with 
legacy behaviors.                                                               
                                                                   |
+| Name                          |  Type   | Required | Default     | 
Description                                                                     
                                                                                
                  |
+|-------------------------------|---------|----------|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                           | String  | Yes      | -           | Http 
request url.                                                                    
                                                                                
             |
+| schema                        | Config  | No       | -           | Http and 
seatunnel data structure mapping                                                
                                                                                
         |
+| schema.fields                 | Config  | No       | -           | The 
schema fields of upstream data                                                  
                                                                                
              |
+| json_field                    | Config  | No       | -           | This 
parameter helps you configure the schema,so this parameter must be used with 
schema.                                                                         
                |
+| pageing                       | Config  | No       | -           | This 
parameter is used for paging queries                                            
                                                                                
             |
+| pageing.page_field            | String  | No       | -           | This 
parameter is used to specify the page field name in the request parameter       
                                                                                
             |
+| pageing.total_page_size       | Int     | No       | -           | This 
parameter is used to control the total number of pages                          
                                                                                
             |
+| pageing.batch_size            | Int     | No       | -           | The batch 
size returned per request is used to determine whether to continue when the 
total number of pages is unknown                                                
            |
+| pageing.start_page_number     | Int     | No       | 1           | Specify 
the page number from which synchronization starts                               
                                                                                
          |
+| pageing.page_type             | String  | No       | PageNumber  | this 
parameter is used to specify the page type ,or PageNumber if not set, only 
support `PageNumber` and `Cursor`.                                  |
+| pageing.cursor_field          | String  | No       | -           | this 
parameter is used to specify the Cursor field name in the request parameter.    
                                                                                
   |
+| pageing.cursor_response_field | String  | No       | -           | This 
parameter specifies the field in the response from which the cursor is 
retrieved.                                                                      
                      |
+| content_json                  | String  | No       | -           | This 
parameter can get some json data.If you only need the data in the 'book' 
section, configure `content_field = "$.store.book.*"`.                          
                    |
+| format                        | String  | No       | text        | The 
format of upstream data, now only support `json` `text`, default `text`.        
                                                                                
              |
+| method                        | String  | No       | get         | Http 
request method, only supports GET, POST method.                                 
                                                                                
             |
+| headers                       | Map     | No       | -           | Http 
headers.                                                                        
                                                                                
             |
+| params                        | Map     | No       | -           | Http 
params.                                                                         
                                                                                
             |
+| body                          | String  | No       | -           | Http 
body,the program will automatically add http header application/json,body is 
jsonbody.                                                                       
                |
+| poll_interval_millis          | Int     | No       | -           | Request 
http api interval(millis) in stream mode.                                       
                                                                                
          |
+| retry                         | Int     | No       | -           | The max 
retry times if request http return to `IOException`.                            
                                                                                
          |
+| retry_backoff_multiplier_ms   | Int     | No       | 100         | The 
retry-backoff times(millis) multiplier if request http failed.                  
                                                                                
              |
+| retry_backoff_max_ms          | Int     | No       | 10000       | The 
maximum retry-backoff times(millis) if request http failed                      
                                                                                
              |
+| enable_multi_lines            | Boolean | No       | false       |           
                                                                                
                                                                                
        |
+| connect_timeout_ms            | Int     | No       | 12000       | 
Connection timeout setting, default 12s.                                        
                                                                                
                  |
+| socket_timeout_ms             | Int     | No       | 60000       | Socket 
timeout setting, default 60s.                                                   
                                                                                
           |
+| common-options                |         | No       | -           | Source 
plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                  |
+| keep_params_as_form           |    Boolean  | No       | false       | 
Whether the params are submitted according to the form, used for compatibility 
with legacy behaviors. When true, the value of the params parameter is 
submitted through the form. |
+| keep_page_param_as_http_param |    Boolean  | No       | false       | 
Whether to set the paging parameters to params. For compatibility with legacy 
behaviors.                                                                      
                    |
+
 
 ## How to Create a Http Data Synchronization Jobs
 
@@ -362,10 +366,15 @@ source {
 - See this link for task configuration 
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
 
 ### pageing
-When you need to concatenate page param in the URL,then add params.
+The current supported pagination type are `PageNumber` and `Cursor`. 
+if you need to use pagination, you need to configure `pageing`. the default 
pagination type is `PageNumber`.
+
 
+#### 1. PageNumber
+When you need to concatenate page param in the URL,then add params.
 When you need to set page param to the body,add the key of page param in body.
 
+
 ```hocon
 source {
     Http {
@@ -378,6 +387,8 @@ source {
        page: "${page}"
       }
       pageing={
+       #you can not set this parameter ,the default value is PageNumber
+       page_type="PageNumber"
        total_page_size=20
        page_field=page
        #when don't know the total_page_size use batch_size if read 
size<batch_size finish ,otherwise continue
@@ -393,6 +404,42 @@ source {
 }
 
 
+``` 
+
+#### 2. Cursor
+the `pageing.page_type` parameter must be set to `Cursor`.
+`cursor_field` is the field name of the cursor in the request parameters.
+`cursor_response_field` is the field name denotes the name of the pagination 
token field in the response data, we should add this to add pageing fields into 
request.
+````hocon
+
+source {
+    Http {
+      plugin_output = "http"
+      url = "http://localhost:8080/mock/cursor_data";
+      method = "GET"
+      format = "json"
+      content_field = "$.data.*"
+      keep_page_param_as_http_param = true
+      pageing ={
+        page_type="Cursor"
+        cursor_field ="cursor"
+        cursor_response_field="$.paging.cursors.next"
+      }
+    schema = {
+      fields {
+        content=string
+        id=int
+        name=string
+      }
+    }
+   json_field = {
+    content = "$.data[*].content"
+    id = "$.data[*].id"
+    name = "$.data[*].name"
+   }
+  }
+}
+
 ```
 
 ## Changelog
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpPaginationType.java
similarity index 50%
copy from 
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
copy to 
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpPaginationType.java
index a5c7061347..714444bb96 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpPaginationType.java
@@ -16,20 +16,34 @@
  */
 package org.apache.seatunnel.connectors.seatunnel.http.config;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+public enum HttpPaginationType {
+    /** Page number based pagination */
+    PAGE_NUMBER(
+            "PageNumber",
+            "traditional page-number-based pagination,uses a page number and 
page size to retrieve a specific slice of data"),
+    /** Cursor based pagination */
+    CURSOR(
+            "Cursor",
+            "token-based cursor pagination,uses a cursor/token to fetch the 
next set of data based on a specific point or marker");
 
-import java.io.Serializable;
+    private final String code;
+    private final String description;
 
-@Setter
-@Getter
-@ToString
-public class PageInfo implements Serializable {
+    HttpPaginationType(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
 
-    private Long totalPageSize;
+    public String getCode() {
+        return code;
+    }
 
-    private Integer batchSize;
-    private String pageField;
-    private Long pageIndex;
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public String toString() {
+        return code;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
index 193f724097..43766cba50 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
@@ -66,6 +66,29 @@ public class HttpSourceOptions extends HttpCommonOptions {
     public static final Option<Map<String, String>> PAGEING =
             
Options.key("pageing").mapType().noDefaultValue().withDescription("pageing");
 
+    public static final Option<HttpPaginationType> PAGE_TYPE =
+            Options.key("page_type")
+                    .enumType(HttpPaginationType.class)
+                    .defaultValue(HttpPaginationType.PAGE_NUMBER)
+                    .withDescription(
+                            "this parameter specifies the pagination type and 
defaults to `PageNumber` if not explicitly set. "
+                                    + "Valid options include `PageNumber` 
(traditional page-number-based pagination) "
+                                    + "and `Cursor` (token-based cursor 
pagination).");
+
+    public static final Option<String> PAGE_CURSOR_FIELD_NAME =
+            Options.key("cursor_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "this parameter is used to specify the Cursor 
field name in the request parameter");
+
+    public static final Option<String> PAGE_CURSOR_RESPONSE_FIELD =
+            Options.key("cursor_response_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "This parameter specifies the field in the 
response from which the cursor is retrieved");
+
     public static final Option<HttpRequestMethod> METHOD =
             Options.key("method")
                     .enumType(HttpRequestMethod.class)
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
index a5c7061347..f2157b052c 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
@@ -32,4 +32,8 @@ public class PageInfo implements Serializable {
     private Integer batchSize;
     private String pageField;
     private Long pageIndex;
+    private String pageType;
+    private String cursor;
+    private String pageCursorFieldName;
+    private String pageCursorResponseField;
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index a30962fb52..6aba050aa0 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -105,6 +105,18 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             if (pageConfig.hasPath(HttpSourceOptions.PAGE_FIELD.key())) {
                 
pageInfo.setPageField(pageConfig.getString(HttpSourceOptions.PAGE_FIELD.key()));
             }
+
+            if (pageConfig.hasPath(HttpSourceOptions.PAGE_TYPE.key())) {
+                
pageInfo.setPageType(pageConfig.getString(HttpSourceOptions.PAGE_TYPE.key()));
+            }
+            if 
(pageConfig.hasPath(HttpSourceOptions.PAGE_CURSOR_FIELD_NAME.key())) {
+                pageInfo.setPageCursorFieldName(
+                        
pageConfig.getString(HttpSourceOptions.PAGE_CURSOR_FIELD_NAME.key()));
+            }
+            if 
(pageConfig.hasPath(HttpSourceOptions.PAGE_CURSOR_RESPONSE_FIELD.key())) {
+                pageInfo.setPageCursorResponseField(
+                        
pageConfig.getString(HttpSourceOptions.PAGE_CURSOR_RESPONSE_FIELD.key()));
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index e6b8499292..9469f3f020 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSpl
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import 
org.apache.seatunnel.connectors.seatunnel.http.config.HttpPaginationType;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
 import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
@@ -149,26 +150,62 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     }
 
     private void updateRequestParam(PageInfo pageInfo) {
-        // keep page param as http param
+        // 1. keep page param as http param
         if (this.httpParameter.isKeepPageParamAsHttpParam()) {
             if (this.httpParameter.getParams() == null) {
                 httpParameter.setParams(new HashMap<>());
             }
-            this.httpParameter
-                    .getParams()
-                    .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+            // keep page cursor as http param
+            if (pageInfo.getPageCursorFieldName() != null && 
pageInfo.getCursor() != null) {
+                this.httpParameter
+                        .getParams()
+                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
+            }
+
+            // keep page index as http param
+            if (pageInfo.getPageField() != null && pageInfo.getPageIndex() != 
null) {
+                this.httpParameter
+                        .getParams()
+                        .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+            }
             return;
         }
 
-        if (MapUtils.isNotEmpty(this.httpParameter.getParams())
-                && 
this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
-            this.httpParameter
-                    .getParams()
-                    .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+        // if not set keepPageParamAsHttpParam, but page field is in params, 
then set page index as
+        // params
+        if (MapUtils.isNotEmpty(this.httpParameter.getParams())) {
+
+            // set page index as params
+            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
+                this.httpParameter
+                        .getParams()
+                        .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+            }
+
+            // set page cursor as params
+            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageCursorFieldName())
+                    && pageInfo.getCursor() != null) {
+                this.httpParameter
+                        .getParams()
+                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
+            }
         }
-        if (MapUtils.isNotEmpty(this.httpParameter.getBody())
-                && 
this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
-            this.httpParameter.getBody().put(pageInfo.getPageField(), 
pageInfo.getPageIndex());
+
+        // 2. param in body
+        if (MapUtils.isNotEmpty(this.httpParameter.getBody())) {
+
+            // set page index as body
+            if 
(this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
+                this.httpParameter.getBody().put(pageInfo.getPageField(), 
pageInfo.getPageIndex());
+            }
+
+            // set page cursor as body
+            if 
(this.httpParameter.getBody().containsKey(pageInfo.getPageCursorFieldName())
+                    && pageInfo.getCursor() != null) {
+                this.httpParameter
+                        .getBody()
+                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
+            }
         }
     }
 
@@ -185,15 +222,26 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
             if (pageInfoOptional.isPresent()) {
                 noMoreElementFlag = false;
                 PageInfo info = pageInfoOptional.get();
-                Long pageIndex = info.getPageIndex();
-                while (!noMoreElementFlag) {
-                    // increment page
-                    info.setPageIndex(pageIndex);
-                    // set request param
-                    updateRequestParam(info);
-                    pollAndCollectData(output);
-                    pageIndex += 1;
-                    Thread.sleep(10);
+                // cursor pagination
+                if 
(HttpPaginationType.CURSOR.getCode().equals(info.getPageType())) {
+                    while (!noMoreElementFlag) {
+                        updateRequestParam(info);
+                        pollAndCollectData(output);
+                        Thread.sleep(10);
+                    }
+
+                } else {
+                    // default page number pagination
+                    Long pageIndex = info.getPageIndex();
+                    while (!noMoreElementFlag) {
+                        // increment page
+                        info.setPageIndex(pageIndex);
+                        // set request param
+                        updateRequestParam(info);
+                        pollAndCollectData(output);
+                        pageIndex += 1;
+                        Thread.sleep(10);
+                    }
                 }
             } else {
                 pollAndCollectData(output);
@@ -212,29 +260,47 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     }
 
     private void collect(Collector<SeaTunnelRow> output, String data) throws 
IOException {
+        String contentData = data;
         if (contentJson != null) {
-            data = JsonUtils.stringToJsonNode(getPartOfJson(data)).toString();
+            contentData = 
JsonUtils.stringToJsonNode(getPartOfJson(data)).toString();
         }
         if (jsonField != null && contentJson == null) {
             this.initJsonPath(jsonField);
-            data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), 
jsonField)).toString();
+            contentData = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), 
jsonField)).toString();
         }
-        // page increase
+        // page
         if (pageInfoOptional.isPresent()) {
-            // Determine whether the task is completed by specifying the 
presence of the 'total
-            // page' field
             PageInfo pageInfo = pageInfoOptional.get();
-            if (pageInfo.getTotalPageSize() > 0) {
-                noMoreElementFlag = pageInfo.getPageIndex() >= 
pageInfo.getTotalPageSize();
+
+            // cursor pagination
+            if 
(HttpPaginationType.CURSOR.getCode().equals(pageInfo.getPageType())) {
+                // get cursor value from response JSON with fileName
+                String cursorResponseField = 
pageInfo.getPageCursorResponseField();
+                ReadContext context = 
JsonPath.using(jsonConfiguration).parse(data);
+                List<String> cursorList = context.read(cursorResponseField, 
List.class);
+                String newCursor = null;
+                if (cursorList != null && !cursorList.isEmpty()) {
+                    newCursor = cursorList.get(0);
+                }
+                pageInfo.setCursor(newCursor);
+                // if not present cursor, then no more data
+                noMoreElementFlag = Strings.isNullOrEmpty(newCursor);
             } else {
-                // no 'total page' configured
-                int readSize = JsonUtils.stringToJsonNode(data).size();
-                // if read size < BatchSize : read finish
-                // if read size = BatchSize : read next page.
-                noMoreElementFlag = readSize < pageInfo.getBatchSize();
+                // if not set page pagination is default
+                // Determine whether the task is completed by specifying the 
presence of the 'total
+                // page' field
+                if (pageInfo.getTotalPageSize() > 0) {
+                    noMoreElementFlag = pageInfo.getPageIndex() >= 
pageInfo.getTotalPageSize();
+                } else {
+                    // no 'total page' configured
+                    int readSize = 
JsonUtils.stringToJsonNode(contentData).size();
+                    // if read size < BatchSize : read finish
+                    // if read size = BatchSize : read next page.
+                    noMoreElementFlag = readSize < pageInfo.getBatchSize();
+                }
             }
         }
-        deserializationCollector.collect(data.getBytes(), output);
+        deserializationCollector.collect(contentData.getBytes(), output);
     }
 
     private List<Map<String, String>> parseToMap(List<List<String>> datas, 
JsonField jsonField) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 6533e2bad8..1d5c0f0afc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -357,6 +357,10 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
         Container.ExecResult execResult19 =
                 container.executeJob("/http_page_increase_start_num.conf");
         Assertions.assertEquals(0, execResult19.getExitCode());
+
+        Container.ExecResult execResult21 =
+                container.executeJob("/http_page_cursor_num_assert.conf");
+        Assertions.assertEquals(0, execResult21.getExitCode());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_cursor_num_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_cursor_num_assert.conf
new file mode 100644
index 0000000000..19c6cbd56c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_cursor_num_assert.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    plugin_output = "http"
+    url = "http://mockserver:1080/query/cursor_pages";
+    method = "GET"
+    format = "json"
+    keep_page_param_as_http_param = true
+    params={
+     cursor: "cursor_1"
+    }
+    pageing = {
+       page_type="Cursor"
+       cursor_field ="cursor"
+       cursor_response_field="$.paging.cursors.next"
+    }
+    json_field = {
+          name = "$.data[*].name"
+          age = "$.data[*].age"
+        }
+  schema = {
+         fields {
+           name = string
+           age = int
+         }
+       }
+}
+}
+
+sink {
+  Assert {
+    plugin_input = "http"
+    rules {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 4
+        },
+        {
+          rule_type = MAX_ROW
+          rule_value = 4
+        }
+      ]
+      field_rules = [
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = age
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 2fc4a26961..0aae640c0d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4773,5 +4773,60 @@
         "Content-Type": "application/json"
       }
     }
+  },
+  {
+    "httpRequest": {
+      "method": "GET",
+      "path": "/query/cursor_pages",
+      "queryStringParameters": {
+       "cursor": "cursor_1"
+      }
+    },
+    "httpResponse": {
+      "body": {
+        "status": null,
+        "msg": null,
+        "data": [
+          {
+            "name": "name1",
+            "age": 69
+          },
+          {
+            "name": "name2",
+            "age": 51
+          }
+        ],
+        "paging": {
+          "cursors": {
+            "next": "cursor_2"
+          }
+        }
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method": "GET",
+      "path": "/query/cursor_pages",
+      "queryStringParameters": {
+        "cursor": "cursor_2"
+      }
+    },
+    "httpResponse": {
+      "body": {
+        "status": null,
+        "msg": null,
+        "data": [
+          {
+            "name": "name3",
+            "age": 45
+          },
+          {
+            "name": "name4",
+            "age": 32
+          }
+        ]
+      }
+    }
   }
 ]
\ No newline at end of file


Reply via email to