This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 879b1e2d5b [Improve][Connector-V2][Http] Supports Cursor-based
Pagination (#9109) (#9138)
879b1e2d5b is described below
commit 879b1e2d5bbe813413e643e1d6412d09de760e4d
Author: alberne wang <[email protected]>
AuthorDate: Tue Apr 15 16:30:22 2025 +0800
[Improve][Connector-V2][Http] Supports Cursor-based Pagination (#9109)
(#9138)
---
docs/en/connector-v2/source/Http.md | 103 +++++++++++-----
.../{PageInfo.java => HttpPaginationType.java} | 38 ++++--
.../seatunnel/http/config/HttpSourceOptions.java | 23 ++++
.../connectors/seatunnel/http/config/PageInfo.java | 4 +
.../seatunnel/http/source/HttpSource.java | 12 ++
.../seatunnel/http/source/HttpSourceReader.java | 134 +++++++++++++++------
.../seatunnel/e2e/connector/http/HttpIT.java | 4 +
.../resources/http_page_cursor_num_assert.conf | 87 +++++++++++++
.../src/test/resources/mockserver-config.json | 55 +++++++++
9 files changed, 386 insertions(+), 74 deletions(-)
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index f9d50458df..2183791af9 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -44,33 +44,37 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
## Source Options
-| Name | Type | Required | Default | Description
|
-|-----------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url | String | Yes | - | Http request
url.
|
-| schema | Config | No | - | Http and
seatunnel data structure mapping
|
-| schema.fields | Config | No | - | The schema
fields of upstream data
|
-| json_field | Config | No | - | This parameter
helps you configure the schema,so this parameter must be used with schema.
|
-| pageing | Config | No | - | This parameter
is used for paging queries
|
-| pageing.page_field | String | No | - | This parameter
is used to specify the page field name in the request parameter
|
-| pageing.total_page_size | Int | No | - | This parameter
is used to control the total number of pages
|
-| pageing.batch_size | Int | No | - | The batch size
returned per request is used to determine whether to continue when the total
number of pages is unknown
|
-| pageing.start_page_number | Int | No | 1 | Specify the
page number from which synchronization starts
|
-| content_json | String | No | - | This parameter
can get some json data.If you only need the data in the 'book' section,
configure `content_field = "$.store.book.*"`.
|
-| format | String | No | text | The format of
upstream data, now only support `json` `text`, default `text`.
|
-| method | String | No | get | Http request
method, only supports GET, POST method.
|
-| headers | Map | No | - | Http headers.
|
-| params | Map | No | - | Http params.
|
-| body | String | No | - | Http body,the
program will automatically add http header application/json,body is jsonbody.
|
-| poll_interval_millis | Int | No | - | Request http
api interval(millis) in stream mode.
|
-| retry | Int | No | - | The max retry
times if request http return to `IOException`.
|
-| retry_backoff_multiplier_ms | Int | No | 100 | The
retry-backoff times(millis) multiplier if request http failed.
|
-| retry_backoff_max_ms | Int | No | 10000 | The maximum
retry-backoff times(millis) if request http failed
|
-| enable_multi_lines | Boolean | No | false |
|
-| connect_timeout_ms | Int | No | 12000 | Connection
timeout setting, default 12s.
|
-| socket_timeout_ms | Int | No | 60000 | Socket timeout
setting, default 60s.
|
-| common-options | | No | - | Source plugin
common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
|
-| keep_params_as_form | Boolean | No | false |
Whether the params are submitted according to the form, used for compatibility
with legacy behaviors. When true, the value of the params parameter is
submitted through the form. |
-| keep_page_param_as_http_param | Boolean | No | false
| Whether to set the paging parameters to params. For compatibility with
legacy behaviors.
|
+| Name | Type | Required | Default |
Description
|
+|-------------------------------|---------|----------|-------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | Http
request url.
|
+| schema | Config | No | - | Http and
seatunnel data structure mapping
|
+| schema.fields | Config | No | - | The
schema fields of upstream data
|
+| json_field | Config | No | - | This
parameter helps you configure the schema,so this parameter must be used with
schema.
|
+| pageing | Config | No | - | This
parameter is used for paging queries
|
+| pageing.page_field | String | No | - | This
parameter is used to specify the page field name in the request parameter
|
+| pageing.total_page_size | Int | No | - | This
parameter is used to control the total number of pages
|
+| pageing.batch_size | Int | No | - | The batch
size returned per request is used to determine whether to continue when the
total number of pages is unknown
|
+| pageing.start_page_number | Int | No | 1 | Specify
the page number from which synchronization starts
|
+| pageing.page_type | String | No | PageNumber | this
parameter is used to specify the page type ,or PageNumber if not set, only
support `PageNumber` and `Cursor`. |
+| pageing.cursor_field | String | No | - | this
parameter is used to specify the Cursor field name in the request parameter.
|
+| pageing.cursor_response_field | String | No | - | This
parameter specifies the field in the response from which the cursor is
retrieved.
|
+| content_json | String | No | - | This
parameter can get some json data.If you only need the data in the 'book'
section, configure `content_field = "$.store.book.*"`.
|
+| format | String | No | text | The
format of upstream data, now only support `json` `text`, default `text`.
|
+| method | String | No | get | Http
request method, only supports GET, POST method.
|
+| headers | Map | No | - | Http
headers.
|
+| params | Map | No | - | Http
params.
|
+| body | String | No | - | Http
body,the program will automatically add http header application/json,body is
jsonbody.
|
+| poll_interval_millis | Int | No | - | Request
http api interval(millis) in stream mode.
|
+| retry | Int | No | - | The max
retry times if request http return to `IOException`.
|
+| retry_backoff_multiplier_ms | Int | No | 100 | The
retry-backoff times(millis) multiplier if request http failed.
|
+| retry_backoff_max_ms | Int | No | 10000 | The
maximum retry-backoff times(millis) if request http failed
|
+| enable_multi_lines | Boolean | No | false |
|
+| connect_timeout_ms | Int | No | 12000 |
Connection timeout setting, default 12s.
|
+| socket_timeout_ms | Int | No | 60000 | Socket
timeout setting, default 60s.
|
+| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
|
+| keep_params_as_form | Boolean | No | false |
Whether the params are submitted according to the form, used for compatibility
with legacy behaviors. When true, the value of the params parameter is
submitted through the form. |
+| keep_page_param_as_http_param | Boolean | No | false |
Whether to set the paging parameters to params. For compatibility with legacy
behaviors.
|
+
## How to Create a Http Data Synchronization Jobs
@@ -362,10 +366,15 @@ source {
- See this link for task configuration
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
### pageing
-When you need to concatenate page param in the URL,then add params.
+The current supported pagination type are `PageNumber` and `Cursor`.
+if you need to use pagination, you need to configure `pageing`. the default
pagination type is `PageNumber`.
+
+#### 1. PageNumber
+When you need to concatenate page param in the URL,then add params.
When you need to set page param to the body,add the key of page param in body.
+
```hocon
source {
Http {
@@ -378,6 +387,8 @@ source {
page: "${page}"
}
pageing={
+ #you can not set this parameter ,the default value is PageNumber
+ page_type="PageNumber"
total_page_size=20
page_field=page
#when don't know the total_page_size use batch_size if read
size<batch_size finish ,otherwise continue
@@ -393,6 +404,42 @@ source {
}
+```
+
+#### 2. Cursor
+the `pageing.page_type` parameter must be set to `Cursor`.
+`cursor_field` is the field name of the cursor in the request parameters.
+`cursor_response_field` is the field name denotes the name of the pagination
token field in the response data, we should add this to add pageing fields into
request.
+````hocon
+
+source {
+ Http {
+ plugin_output = "http"
+ url = "http://localhost:8080/mock/cursor_data"
+ method = "GET"
+ format = "json"
+ content_field = "$.data.*"
+ keep_page_param_as_http_param = true
+ pageing ={
+ page_type="Cursor"
+ cursor_field ="cursor"
+ cursor_response_field="$.paging.cursors.next"
+ }
+ schema = {
+ fields {
+ content=string
+ id=int
+ name=string
+ }
+ }
+ json_field = {
+ content = "$.data[*].content"
+ id = "$.data[*].id"
+ name = "$.data[*].name"
+ }
+ }
+}
+
```
## Changelog
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpPaginationType.java
similarity index 50%
copy from
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
copy to
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpPaginationType.java
index a5c7061347..714444bb96 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpPaginationType.java
@@ -16,20 +16,34 @@
*/
package org.apache.seatunnel.connectors.seatunnel.http.config;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+public enum HttpPaginationType {
+ /** Page number based pagination */
+ PAGE_NUMBER(
+ "PageNumber",
+ "traditional page-number-based pagination,uses a page number and
page size to retrieve a specific slice of data"),
+ /** Cursor based pagination */
+ CURSOR(
+ "Cursor",
+ "token-based cursor pagination,uses a cursor/token to fetch the
next set of data based on a specific point or marker");
-import java.io.Serializable;
+ private final String code;
+ private final String description;
-@Setter
-@Getter
-@ToString
-public class PageInfo implements Serializable {
+ HttpPaginationType(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
- private Long totalPageSize;
+ public String getCode() {
+ return code;
+ }
- private Integer batchSize;
- private String pageField;
- private Long pageIndex;
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return code;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
index 193f724097..43766cba50 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSourceOptions.java
@@ -66,6 +66,29 @@ public class HttpSourceOptions extends HttpCommonOptions {
public static final Option<Map<String, String>> PAGEING =
Options.key("pageing").mapType().noDefaultValue().withDescription("pageing");
+ public static final Option<HttpPaginationType> PAGE_TYPE =
+ Options.key("page_type")
+ .enumType(HttpPaginationType.class)
+ .defaultValue(HttpPaginationType.PAGE_NUMBER)
+ .withDescription(
+ "this parameter specifies the pagination type and
defaults to `PageNumber` if not explicitly set. "
+ + "Valid options include `PageNumber`
(traditional page-number-based pagination) "
+ + "and `Cursor` (token-based cursor
pagination).");
+
+ public static final Option<String> PAGE_CURSOR_FIELD_NAME =
+ Options.key("cursor_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "this parameter is used to specify the Cursor
field name in the request parameter");
+
+ public static final Option<String> PAGE_CURSOR_RESPONSE_FIELD =
+ Options.key("cursor_response_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "This parameter specifies the field in the
response from which the cursor is retrieved");
+
public static final Option<HttpRequestMethod> METHOD =
Options.key("method")
.enumType(HttpRequestMethod.class)
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
index a5c7061347..f2157b052c 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
@@ -32,4 +32,8 @@ public class PageInfo implements Serializable {
private Integer batchSize;
private String pageField;
private Long pageIndex;
+ private String pageType;
+ private String cursor;
+ private String pageCursorFieldName;
+ private String pageCursorResponseField;
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index a30962fb52..6aba050aa0 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -105,6 +105,18 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
if (pageConfig.hasPath(HttpSourceOptions.PAGE_FIELD.key())) {
pageInfo.setPageField(pageConfig.getString(HttpSourceOptions.PAGE_FIELD.key()));
}
+
+ if (pageConfig.hasPath(HttpSourceOptions.PAGE_TYPE.key())) {
+
pageInfo.setPageType(pageConfig.getString(HttpSourceOptions.PAGE_TYPE.key()));
+ }
+ if
(pageConfig.hasPath(HttpSourceOptions.PAGE_CURSOR_FIELD_NAME.key())) {
+ pageInfo.setPageCursorFieldName(
+
pageConfig.getString(HttpSourceOptions.PAGE_CURSOR_FIELD_NAME.key()));
+ }
+ if
(pageConfig.hasPath(HttpSourceOptions.PAGE_CURSOR_RESPONSE_FIELD.key())) {
+ pageInfo.setPageCursorResponseField(
+
pageConfig.getString(HttpSourceOptions.PAGE_CURSOR_RESPONSE_FIELD.key()));
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index e6b8499292..9469f3f020 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSpl
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import
org.apache.seatunnel.connectors.seatunnel.http.config.HttpPaginationType;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
@@ -149,26 +150,62 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
}
private void updateRequestParam(PageInfo pageInfo) {
- // keep page param as http param
+ // 1. keep page param as http param
if (this.httpParameter.isKeepPageParamAsHttpParam()) {
if (this.httpParameter.getParams() == null) {
httpParameter.setParams(new HashMap<>());
}
- this.httpParameter
- .getParams()
- .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ // keep page cursor as http param
+ if (pageInfo.getPageCursorFieldName() != null &&
pageInfo.getCursor() != null) {
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageCursorFieldName(),
pageInfo.getCursor());
+ }
+
+ // keep page index as http param
+ if (pageInfo.getPageField() != null && pageInfo.getPageIndex() !=
null) {
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ }
return;
}
- if (MapUtils.isNotEmpty(this.httpParameter.getParams())
- &&
this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
- this.httpParameter
- .getParams()
- .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ // if not set keepPageParamAsHttpParam, but page field is in params,
then set page index as
+ // params
+ if (MapUtils.isNotEmpty(this.httpParameter.getParams())) {
+
+ // set page index as params
+ if
(this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ }
+
+ // set page cursor as params
+ if
(this.httpParameter.getParams().containsKey(pageInfo.getPageCursorFieldName())
+ && pageInfo.getCursor() != null) {
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageCursorFieldName(),
pageInfo.getCursor());
+ }
}
- if (MapUtils.isNotEmpty(this.httpParameter.getBody())
- &&
this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
- this.httpParameter.getBody().put(pageInfo.getPageField(),
pageInfo.getPageIndex());
+
+ // 2. param in body
+ if (MapUtils.isNotEmpty(this.httpParameter.getBody())) {
+
+ // set page index as body
+ if
(this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
+ this.httpParameter.getBody().put(pageInfo.getPageField(),
pageInfo.getPageIndex());
+ }
+
+ // set page cursor as body
+ if
(this.httpParameter.getBody().containsKey(pageInfo.getPageCursorFieldName())
+ && pageInfo.getCursor() != null) {
+ this.httpParameter
+ .getBody()
+ .put(pageInfo.getPageCursorFieldName(),
pageInfo.getCursor());
+ }
}
}
@@ -185,15 +222,26 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
if (pageInfoOptional.isPresent()) {
noMoreElementFlag = false;
PageInfo info = pageInfoOptional.get();
- Long pageIndex = info.getPageIndex();
- while (!noMoreElementFlag) {
- // increment page
- info.setPageIndex(pageIndex);
- // set request param
- updateRequestParam(info);
- pollAndCollectData(output);
- pageIndex += 1;
- Thread.sleep(10);
+ // cursor pagination
+ if
(HttpPaginationType.CURSOR.getCode().equals(info.getPageType())) {
+ while (!noMoreElementFlag) {
+ updateRequestParam(info);
+ pollAndCollectData(output);
+ Thread.sleep(10);
+ }
+
+ } else {
+ // default page number pagination
+ Long pageIndex = info.getPageIndex();
+ while (!noMoreElementFlag) {
+ // increment page
+ info.setPageIndex(pageIndex);
+ // set request param
+ updateRequestParam(info);
+ pollAndCollectData(output);
+ pageIndex += 1;
+ Thread.sleep(10);
+ }
}
} else {
pollAndCollectData(output);
@@ -212,29 +260,47 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
}
private void collect(Collector<SeaTunnelRow> output, String data) throws
IOException {
+ String contentData = data;
if (contentJson != null) {
- data = JsonUtils.stringToJsonNode(getPartOfJson(data)).toString();
+ contentData =
JsonUtils.stringToJsonNode(getPartOfJson(data)).toString();
}
if (jsonField != null && contentJson == null) {
this.initJsonPath(jsonField);
- data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data),
jsonField)).toString();
+ contentData = JsonUtils.toJsonNode(parseToMap(decodeJSON(data),
jsonField)).toString();
}
- // page increase
+ // page
if (pageInfoOptional.isPresent()) {
- // Determine whether the task is completed by specifying the
presence of the 'total
- // page' field
PageInfo pageInfo = pageInfoOptional.get();
- if (pageInfo.getTotalPageSize() > 0) {
- noMoreElementFlag = pageInfo.getPageIndex() >=
pageInfo.getTotalPageSize();
+
+ // cursor pagination
+ if
(HttpPaginationType.CURSOR.getCode().equals(pageInfo.getPageType())) {
+ // get cursor value from response JSON with fileName
+ String cursorResponseField =
pageInfo.getPageCursorResponseField();
+ ReadContext context =
JsonPath.using(jsonConfiguration).parse(data);
+ List<String> cursorList = context.read(cursorResponseField,
List.class);
+ String newCursor = null;
+ if (cursorList != null && !cursorList.isEmpty()) {
+ newCursor = cursorList.get(0);
+ }
+ pageInfo.setCursor(newCursor);
+ // if not present cursor, then no more data
+ noMoreElementFlag = Strings.isNullOrEmpty(newCursor);
} else {
- // no 'total page' configured
- int readSize = JsonUtils.stringToJsonNode(data).size();
- // if read size < BatchSize : read finish
- // if read size = BatchSize : read next page.
- noMoreElementFlag = readSize < pageInfo.getBatchSize();
+ // if not set page pagination is default
+ // Determine whether the task is completed by specifying the
presence of the 'total
+ // page' field
+ if (pageInfo.getTotalPageSize() > 0) {
+ noMoreElementFlag = pageInfo.getPageIndex() >=
pageInfo.getTotalPageSize();
+ } else {
+ // no 'total page' configured
+ int readSize =
JsonUtils.stringToJsonNode(contentData).size();
+ // if read size < BatchSize : read finish
+ // if read size = BatchSize : read next page.
+ noMoreElementFlag = readSize < pageInfo.getBatchSize();
+ }
}
}
- deserializationCollector.collect(data.getBytes(), output);
+ deserializationCollector.collect(contentData.getBytes(), output);
}
private List<Map<String, String>> parseToMap(List<List<String>> datas,
JsonField jsonField) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 6533e2bad8..1d5c0f0afc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -357,6 +357,10 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
Container.ExecResult execResult19 =
container.executeJob("/http_page_increase_start_num.conf");
Assertions.assertEquals(0, execResult19.getExitCode());
+
+ Container.ExecResult execResult21 =
+ container.executeJob("/http_page_cursor_num_assert.conf");
+ Assertions.assertEquals(0, execResult21.getExitCode());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_cursor_num_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_cursor_num_assert.conf
new file mode 100644
index 0000000000..19c6cbd56c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_cursor_num_assert.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Http {
+ plugin_output = "http"
+ url = "http://mockserver:1080/query/cursor_pages"
+ method = "GET"
+ format = "json"
+ keep_page_param_as_http_param = true
+ params={
+ cursor: "cursor_1"
+ }
+ pageing = {
+ page_type="Cursor"
+ cursor_field ="cursor"
+ cursor_response_field="$.paging.cursors.next"
+ }
+ json_field = {
+ name = "$.data[*].name"
+ age = "$.data[*].age"
+ }
+ schema = {
+ fields {
+ name = string
+ age = int
+ }
+ }
+}
+}
+
+sink {
+ Assert {
+ plugin_input = "http"
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 4
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 4
+ }
+ ]
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 2fc4a26961..0aae640c0d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4773,5 +4773,60 @@
"Content-Type": "application/json"
}
}
+ },
+ {
+ "httpRequest": {
+ "method": "GET",
+ "path": "/query/cursor_pages",
+ "queryStringParameters": {
+ "cursor": "cursor_1"
+ }
+ },
+ "httpResponse": {
+ "body": {
+ "status": null,
+ "msg": null,
+ "data": [
+ {
+ "name": "name1",
+ "age": 69
+ },
+ {
+ "name": "name2",
+ "age": 51
+ }
+ ],
+ "paging": {
+ "cursors": {
+ "next": "cursor_2"
+ }
+ }
+ }
+ }
+ },
+ {
+ "httpRequest": {
+ "method": "GET",
+ "path": "/query/cursor_pages",
+ "queryStringParameters": {
+ "cursor": "cursor_2"
+ }
+ },
+ "httpResponse": {
+ "body": {
+ "status": null,
+ "msg": null,
+ "data": [
+ {
+ "name": "name3",
+ "age": 45
+ },
+ {
+ "name": "name4",
+ "age": 32
+ }
+ ]
+ }
+ }
}
]
\ No newline at end of file