This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ed15f0dcf9 [Feature][Connector-V2] Suport choose the start page in
http paging (#7180)
ed15f0dcf9 is described below
commit ed15f0dcf941b22b5ea55bb21ca2b7ceacff07dd
Author: Gxinge <[email protected]>
AuthorDate: Thu Aug 22 10:39:27 2024 +0800
[Feature][Connector-V2] Suport choose the start page in http paging (#7180)
* feature-http page specifies the home page
* update
* update
* update
---------
Co-authored-by: gaoxi <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
---
docs/en/connector-v2/source/Http.md | 1 +
.../seatunnel/http/config/HttpConfig.java | 5 ++
.../seatunnel/http/source/HttpSource.java | 5 ++
.../seatunnel/http/source/HttpSourceFactory.java | 1 +
.../seatunnel/http/source/HttpSourceReader.java | 4 +-
.../seatunnel/e2e/connector/http/HttpIT.java | 4 ++
.../resources/http_page_increase_start_num.conf | 83 ++++++++++++++++++++++
7 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index 78ceccf442..318b8cf00a 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -52,6 +52,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| pageing.page_field | String | No | - | This parameter
is used to specify the page field name in the request parameter
|
| pageing.total_page_size | Int | No | - | This parameter
is used to control the total number of pages
|
| pageing.batch_size | Int | No | - | The batch size
returned per request is used to determine whether to continue when the total
number of pages is unknown |
+| pageing.start_page_number | Int | No | 1 | Specify the
page number from which synchronization starts
|
| content_json | String | No | - | This parameter
can get some json data.If you only need the data in the 'book' section,
configure `content_field = "$.store.book.*"`. |
| format | String | No | text | The format of
upstream data, now only support `json` `text`, default `text`.
|
| method | String | No | get | Http request
method, only supports GET, POST method.
|
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 043f907e44..489b8d124b 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -44,6 +44,11 @@ public class HttpConfig {
.defaultValue(100)
.withDescription(
"the batch size returned per request is used to
determine whether to continue when the total number of pages is unknown");
+ public static final Option<Long> START_PAGE_NUMBER =
+ Options.key("start_page_number")
+ .longType()
+ .defaultValue(1L)
+ .withDescription("which page to start synchronizing from");
public static final Option<String> PAGE_FIELD =
Options.key("page_field")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index c41e8a9a84..69c87e4b91 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -101,6 +101,11 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
} else {
pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue());
}
+ if (pageConfig.hasPath(HttpConfig.START_PAGE_NUMBER.key())) {
+
pageInfo.setPageIndex(pageConfig.getLong(HttpConfig.START_PAGE_NUMBER.key()));
+ } else {
+
pageInfo.setPageIndex(HttpConfig.START_PAGE_NUMBER.defaultValue());
+ }
if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) {
pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key()));
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
index c0a276d723..853a0f2c69 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceFactory.java
@@ -60,6 +60,7 @@ public class HttpSourceFactory implements TableSourceFactory {
.optional(HttpConfig.PARAMS)
.optional(HttpConfig.FORMAT)
.optional(HttpConfig.BODY)
+ .optional(HttpConfig.PAGEING)
.optional(HttpConfig.JSON_FIELD)
.optional(HttpConfig.CONTENT_FIELD)
.conditional(
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 104d769ef5..051507e8e3 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -158,9 +158,9 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
try {
if (pageInfoOptional.isPresent()) {
noMoreElementFlag = false;
- Long pageIndex = 1L;
+ PageInfo info = pageInfoOptional.get();
+ Long pageIndex = info.getPageIndex();
while (!noMoreElementFlag) {
- PageInfo info = pageInfoOptional.get();
// increment page
info.setPageIndex(pageIndex);
// set request param
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index ab8acd1f86..9e9bed031d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -229,6 +229,10 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
Container.ExecResult execResult18 =
container.executeJob("/httpnoschema_to_http.conf");
Assertions.assertEquals(0, execResult18.getExitCode());
+
+ Container.ExecResult execResult19 =
+ container.executeJob("/http_page_increase_start_num.conf");
+ Assertions.assertEquals(0, execResult19.getExitCode());
}
@DisabledOnContainer(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
new file mode 100644
index 0000000000..60ced88b2d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Http {
+ result_table_name = "http"
+ url = "http://mockserver:1080/query/pages"
+ method = "GET"
+ format = "json"
+ json_field = {
+ name = "$.data[*].name"
+ age = "$.data[*].age"
+ }
+ pageing = {
+ total_page_size = 2
+ page_field = page
+ start_page_number = 2
+ }
+ schema = {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "http"
+ rules {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ ]
+ field_rules = [
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = age
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}