This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c1b2675ab0 [Fix][connector-http] fix when post have param (#8434)
c1b2675ab0 is described below
commit c1b2675ab0826bc90a16766b1811f9f3dc4c2c0c
Author: CosmosNi <[email protected]>
AuthorDate: Mon Feb 24 10:15:59 2025 +0800
[Fix][connector-http] fix when post have param (#8434)
---
docs/en/connector-v2/source/Http.md | 104 +++++++++++++++------
.../seatunnel/http/client/HttpClientProvider.java | 66 +++++++++++--
.../seatunnel/http/config/HttpConfig.java | 11 +++
.../seatunnel/http/config/HttpParameter.java | 23 ++++-
.../seatunnel/http/source/HttpSourceReader.java | 30 ++++--
.../seatunnel/myhours/source/MyHoursSource.java | 6 +-
.../source/config/MyHoursSourceParameter.java | 6 +-
.../prometheus/source/PrometheusSourceReader.java | 3 +-
.../seatunnel/e2e/connector/http/HttpIT.java | 9 ++
.../resources/http_formrequestbody_to_assert.conf | 1 +
...t.conf => http_formrequestbody_to_assert2.conf} | 5 +-
.../resources/http_page_increase_no_page_num.conf | 1 +
.../resources/http_page_increase_page_num.conf | 1 +
.../resources/http_page_increase_start_num.conf | 1 +
...rt.conf => http_post_param_json_to_assert.conf} | 9 +-
.../src/test/resources/mockserver-config.json | 31 +++++-
16 files changed, 253 insertions(+), 54 deletions(-)
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index 511ba04132..88b6a67504 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -42,31 +42,33 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
## Source Options
-| Name | Type | Required | Default |
Description
|
-|-----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------|
-| url | String | Yes | - | Http request
url.
|
-| schema | Config | No | - | Http and
seatunnel data structure mapping
|
-| schema.fields | Config | No | - | The schema
fields of upstream data
|
-| json_field | Config | No | - | This parameter
helps you configure the schema,so this parameter must be used with schema.
|
-| pageing | Config | No | - | This parameter
is used for paging queries
|
-| pageing.page_field | String | No | - | This parameter
is used to specify the page field name in the request parameter
|
-| pageing.total_page_size | Int | No | - | This parameter
is used to control the total number of pages
|
-| pageing.batch_size | Int | No | - | The batch size
returned per request is used to determine whether to continue when the total
number of pages is unknown |
-| pageing.start_page_number | Int | No | 1 | Specify the
page number from which synchronization starts
|
-| content_json | String | No | - | This parameter
can get some json data.If you only need the data in the 'book' section,
configure `content_field = "$.store.book.*"`. |
-| format | String | No | text | The format of
upstream data, now only support `json` `text`, default `text`.
|
-| method | String | No | get | Http request
method, only supports GET, POST method.
|
-| headers | Map | No | - | Http headers.
|
-| params | Map | No | - | Http params,the
program will automatically add http header application/x-www-form-urlencoded.
|
-| body | String | No | - | Http body,the
program will automatically add http header application/json,body is jsonbody.
|
-| poll_interval_millis | Int | No | - | Request http
api interval(millis) in stream mode.
|
-| retry | Int | No | - | The max retry
times if request http return to `IOException`.
|
-| retry_backoff_multiplier_ms | Int | No | 100 | The
retry-backoff times(millis) multiplier if request http failed.
|
-| retry_backoff_max_ms | Int | No | 10000 | The maximum
retry-backoff times(millis) if request http failed
|
-| enable_multi_lines | Boolean | No | false |
|
-| connect_timeout_ms | Int | No | 12000 | Connection
timeout setting, default 12s.
|
-| socket_timeout_ms | Int | No | 60000 | Socket timeout
setting, default 60s.
|
-| common-options | | No | - | Source plugin
common parameters, please refer to [Source Common
Options](../source-common-options.md) for details |
+| Name | Type | Required | Default | Description
|
+|-----------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | Http request
url.
|
+| schema | Config | No | - | Http and
seatunnel data structure mapping
|
+| schema.fields | Config | No | - | The schema
fields of upstream data
|
+| json_field | Config | No | - | This parameter
helps you configure the schema,so this parameter must be used with schema.
|
+| pageing | Config | No | - | This parameter
is used for paging queries
|
+| pageing.page_field | String | No | - | This parameter
is used to specify the page field name in the request parameter
|
+| pageing.total_page_size | Int | No | - | This parameter
is used to control the total number of pages
|
+| pageing.batch_size | Int | No | - | The batch size
returned per request is used to determine whether to continue when the total
number of pages is unknown
|
+| pageing.start_page_number | Int | No | 1 | Specify the
page number from which synchronization starts
|
+| content_json | String | No | - | This parameter
can get some json data.If you only need the data in the 'book' section,
configure `content_field = "$.store.book.*"`.
|
+| format | String | No | text | The format of
upstream data, now only support `json` `text`, default `text`.
|
+| method | String | No | get | Http request
method, only supports GET, POST method.
|
+| headers | Map | No | - | Http headers.
|
+| params | Map | No | - | Http params.
|
+| body | String | No | - | Http body,the
program will automatically add http header application/json,body is jsonbody.
|
+| poll_interval_millis | Int | No | - | Request http
api interval(millis) in stream mode.
|
+| retry | Int | No | - | The max retry
times if request http return to `IOException`.
|
+| retry_backoff_multiplier_ms | Int | No | 100 | The
retry-backoff times(millis) multiplier if request http failed.
|
+| retry_backoff_max_ms | Int | No | 10000 | The maximum
retry-backoff times(millis) if request http failed
|
+| enable_multi_lines | Boolean | No | false |
|
+| connect_timeout_ms | Int | No | 12000 | Connection
timeout setting, default 12s.
|
+| socket_timeout_ms | Int | No | 60000 | Socket timeout
setting, default 60s.
|
+| common-options | | No | - | Source plugin
common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
|
+| keep_params_as_form | Boolean | No | false |
Whether the params are submitted according to the form, used for compatibility
with legacy behaviors. When true, the value of the params parameter is
submitted through the form. |
+| keep_page_param_as_http_param | Boolean | No | false
| Whether to set the paging parameters to params. For compatibility with
legacy behaviors.
|
## How to Create a Http Data Synchronization Jobs
@@ -181,6 +183,46 @@ connector will generate data as the following:
|----------------------------------------------------------|
| {"code": 200, "data": "get success", "success": true} |
+### keep_params_as_form
+For compatibility with old versions of http.
+When set to true,`<params>` and `<pageing>` will be submitted in the form.
+When set to false,`<params>` will be added to the url path,and `<pageing>`
will not be added to the body or form. It will replace placeholders in params
and body.
+
+### keep_page_param_as_http_param
+Whether to set the paging parameters to params.
+When set to true,`<pageing>` is set to `<params>`.
+When set to false,When the page field exists in `<body>` or `<params>`,
replace value.
+
+When set to false,config example:
+```hocon
+body="""{"id":1,"page":"${page}"}"""
+```
+
+```hocon
+params={
+ page: "${page}"
+}
+```
+
+### params
+By default, the parameters will be added to the url path.
+If you need to keep the old version behavior, please check keep_params_as_form.
+
+### body
+The HTTP body is used to carry the actual data in requests or responses,
including JSON, form submissions.
+
+The reference format is as follows:
+```hocon
+body="{"id":1,"name":"setunnel"}"
+```
+
+For form submissions,please set the content-type as follows.
+```hocon
+headers {
+ Content-Type = "application/x-www-form-urlencoded"
+}
+```
+
### content_json
This parameter can get some json data.If you only need the data in the 'book'
section, configure `content_field = "$.store.book.*"`.
@@ -318,17 +360,21 @@ source {
- See this link for task configuration
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
### pageing
+When you need to concatenate page param in the URL,then add params.
+
+When you need to set page param to the body,add the key of page param in body.
```hocon
source {
Http {
url = "http://localhost:8080/mock/queryData"
- method = "GET"
+ method = "POST"
format = "json"
+ body="""{"id":1,"page":"${page}"}"""
+ content_field = "$.data.*"
params={
page: "${page}"
}
- content_field = "$.data.*"
pageing={
total_page_size=20
page_field=page
@@ -344,6 +390,7 @@ source {
}
}
+
```
## Changelog
@@ -354,5 +401,4 @@ source {
### new version
-- [Feature][Connector-V2][HTTP] Use json-path parsing
([3510](https://github.com/apache/seatunnel/pull/3510))
-
+- [Feature][Connector-V2][HTTP] Use json-path parsing
([3510](https://github.com/apache/seatunnel/pull/3510))
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
index cbea79a15a..1fb4a54cad 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.http.client;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpStatus;
@@ -67,6 +69,7 @@ import java.util.concurrent.TimeUnit;
public class HttpClientProvider implements AutoCloseable {
private static final String ENCODING = "UTF-8";
private static final String APPLICATION_JSON = "application/json";
+ private static final String APPLICATION_FORM =
"application/x-www-form-urlencoded";
private static final int INITIAL_CAPACITY = 16;
private RequestConfig requestConfig;
private final CloseableHttpClient httpClient;
@@ -115,11 +118,26 @@ public class HttpClientProvider implements AutoCloseable {
String method,
Map<String, String> headers,
Map<String, String> params,
- String body)
+ Map<String, Object> body,
+ boolean keepParamsAsForm)
throws Exception {
// convert method option to uppercase
method = method.toUpperCase(Locale.ROOT);
+ // Keep the original post logic
+ if (HttpPost.METHOD_NAME.equals(method) && keepParamsAsForm) {
+ // Compatible with old versions
+ if (MapUtils.isNotEmpty(params)) {
+ headers = MapUtils.isEmpty(headers) ? new HashMap<>() :
headers;
+ headers.putIfAbsent(HTTP.CONTENT_TYPE, APPLICATION_FORM);
+ }
+ if (MapUtils.isEmpty(body)) {
+ body = new HashMap<>();
+ }
+ body.putAll(params);
+ return doPost(url, headers, Collections.emptyMap(), body);
+ }
if (HttpPost.METHOD_NAME.equals(method)) {
+ // Create access address
return doPost(url, headers, params, body);
}
if (HttpGet.METHOD_NAME.equals(method)) {
@@ -292,7 +310,6 @@ public class HttpClientProvider implements AutoCloseable {
/**
* Send a post request with request headers , request parameters and
request body
*
- * @param url request address
* @param headers request header map
* @param params request parameter map
* @param body request body
@@ -300,16 +317,19 @@ public class HttpClientProvider implements AutoCloseable {
* @throws Exception information
*/
public HttpResponse doPost(
- String url, Map<String, String> headers, Map<String, String>
params, String body)
+ String url,
+ Map<String, String> headers,
+ Map<String, String> params,
+ Map<String, Object> body)
throws Exception {
- // create a new http get
- HttpPost httpPost = new HttpPost(url);
+ URIBuilder uriBuilder = new URIBuilder(url);
+ // add parameter to uri
+ addParameters(uriBuilder, params);
+ HttpPost httpPost = new HttpPost(uriBuilder.build());
// set default request config
httpPost.setConfig(requestConfig);
// set request header
addHeaders(httpPost, headers);
- // set request params
- addParameters(httpPost, params);
// add body in request
addBody(httpPost, body);
// return http response
@@ -429,6 +449,38 @@ public class HttpClientProvider implements AutoCloseable {
headers.forEach(request::addHeader);
}
+ private void addBody(HttpEntityEnclosingRequestBase request, Map<String,
Object> body)
+ throws UnsupportedEncodingException {
+ if (MapUtils.isEmpty(body)) {
+ body = new HashMap<>();
+ }
+ boolean isFormSubmit =
+ request.getHeaders(HTTP.CONTENT_TYPE) != null
+ && request.getHeaders(HTTP.CONTENT_TYPE).length > 0
+ && APPLICATION_FORM.equalsIgnoreCase(
+
request.getHeaders(HTTP.CONTENT_TYPE)[0].getValue());
+ if (isFormSubmit) {
+ if (MapUtils.isNotEmpty(body)) {
+ List<NameValuePair> parameters = new ArrayList<>();
+ Set<Map.Entry<String, Object>> entrySet = body.entrySet();
+ for (Map.Entry<String, Object> e : entrySet) {
+ String name = e.getKey();
+ String value = e.getValue().toString();
+ NameValuePair pair = new BasicNameValuePair(name, value);
+ parameters.add(pair);
+ }
+ // Set to the request's http object
+ request.setEntity(new UrlEncodedFormEntity(parameters,
ENCODING));
+ }
+ } else {
+ request.addHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON);
+ StringEntity entity =
+ new StringEntity(JsonUtils.toJsonString(body),
ContentType.APPLICATION_JSON);
+ entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE,
APPLICATION_JSON));
+ request.setEntity(entity);
+ }
+ }
+
private boolean checkAlreadyHaveContentType(HttpEntityEnclosingRequestBase
request) {
if (request.getEntity() != null &&
request.getEntity().getContentType() != null) {
return
HTTP.CONTENT_TYPE.equals(request.getEntity().getContentType().getName());
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 489b8d124b..745534483f 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -33,6 +33,17 @@ public class HttpConfig {
public static final int DEFAULT_SOCKET_TIMEOUT_MS = 6000 * 10;
public static final Option<String> URL =
Options.key("url").stringType().noDefaultValue().withDescription("Http request
url");
+ public static final Option<Boolean> KEEP_PARAMS_AS_FORM =
+ Options.key("keep_params_as_form")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Keep param as form");
+ public static final Option<Boolean> KEEP_PAGE_PARAM_AS_HTTP_PARAM =
+ Options.key("keep_page_param_as_http_param")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("keep page param as http param");
+
public static final Option<Long> TOTAL_PAGE_SIZE =
Options.key("total_page_size")
.longType()
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index e4dffb539a..ca999ad8c7 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import lombok.Data;
@@ -32,7 +33,10 @@ public class HttpParameter implements Serializable {
protected HttpRequestMethod method;
protected Map<String, String> headers;
protected Map<String, String> params;
- protected String body;
+ protected Map<String, Object> pageParams;
+ protected boolean keepParamsAsForm = false;
+ protected boolean keepPageParamAsHttpParam = false;
+ protected Map<String, Object> body;
protected int pollIntervalMillis;
protected int retry;
protected int retryBackoffMultiplierMillis =
HttpConfig.DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS;
@@ -44,6 +48,13 @@ public class HttpParameter implements Serializable {
public void buildWithConfig(Config pluginConfig) {
// set url
this.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
+ if (pluginConfig.hasPath(HttpConfig.KEEP_PARAMS_AS_FORM.key())) {
+
this.setKeepParamsAsForm(pluginConfig.getBoolean(HttpConfig.KEEP_PARAMS_AS_FORM.key()));
+ }
+ if
(pluginConfig.hasPath(HttpConfig.KEEP_PAGE_PARAM_AS_HTTP_PARAM.key())) {
+ this.setKeepPageParamAsHttpParam(
+
pluginConfig.getBoolean(HttpConfig.KEEP_PAGE_PARAM_AS_HTTP_PARAM.key()));
+ }
// set method
if (pluginConfig.hasPath(HttpConfig.METHOD.key())) {
HttpRequestMethod httpRequestMethod =
@@ -75,7 +86,15 @@ public class HttpParameter implements Serializable {
}
// set body
if (pluginConfig.hasPath(HttpConfig.BODY.key())) {
- this.setBody(pluginConfig.getString(HttpConfig.BODY.key()));
+
+ this.setBody(
+
ConfigFactory.parseString(pluginConfig.getString(HttpConfig.BODY.key()))
+ .entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
entry.getValue().unwrapped(),
+ (v1, v2) -> v2)));
}
if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS.key())) {
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS.key()));
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 7345690569..e6b8499292 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -34,6 +34,8 @@ import
org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
+import org.apache.commons.collections4.MapUtils;
+
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
@@ -116,7 +118,8 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
this.httpParameter.getMethod().getMethod(),
this.httpParameter.getHeaders(),
this.httpParameter.getParams(),
- this.httpParameter.getBody());
+ this.httpParameter.getBody(),
+ this.httpParameter.isKeepParamsAsForm());
if (response.getCode() >= 200 && response.getCode() <= 207) {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
@@ -146,12 +149,27 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
}
private void updateRequestParam(PageInfo pageInfo) {
- if (this.httpParameter.getParams() == null) {
- httpParameter.setParams(new HashMap<>());
+ // keep page param as http param
+ if (this.httpParameter.isKeepPageParamAsHttpParam()) {
+ if (this.httpParameter.getParams() == null) {
+ httpParameter.setParams(new HashMap<>());
+ }
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ return;
+ }
+
+ if (MapUtils.isNotEmpty(this.httpParameter.getParams())
+ &&
this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
+ this.httpParameter
+ .getParams()
+ .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
+ }
+ if (MapUtils.isNotEmpty(this.httpParameter.getBody())
+ &&
this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
+ this.httpParameter.getBody().put(pageInfo.getPageField(),
pageInfo.getPageIndex());
}
- this.httpParameter
- .getParams()
- .put(pageInfo.getPageField(),
pageInfo.getPageIndex().toString());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
index 6ac7e88258..6841d15943 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
@@ -40,6 +40,7 @@ import
org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHour
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
@Slf4j
@@ -89,7 +90,10 @@ public class MyHoursSource extends HttpSource {
try {
HttpResponse response =
loginHttpClient.doPost(
- myHoursLoginParameter.getUrl(),
myHoursLoginParameter.getBody());
+ myHoursLoginParameter.getUrl(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ myHoursLoginParameter.getBody());
if (HttpResponse.STATUS_OK == response.getCode()) {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
index 08a802db41..2171c70a29 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
@@ -43,15 +42,14 @@ public class MyHoursSourceParameter extends HttpParameter {
// set method
this.setMethod(HttpRequestMethod.valueOf(MyHoursSourceConfig.POST));
// set body
- Map<String, String> bodyParams = new HashMap<>();
+ Map<String, Object> bodyParams = new HashMap<>();
String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL.key());
String password =
pluginConfig.getString(MyHoursSourceConfig.PASSWORD.key());
bodyParams.put(MyHoursSourceConfig.GRANT_TYPE,
MyHoursSourceConfig.PASSWORD.key());
bodyParams.put(MyHoursSourceConfig.EMAIL.key(), email);
bodyParams.put(MyHoursSourceConfig.PASSWORD.key(), password);
bodyParams.put(MyHoursSourceConfig.CLIENT_ID, MyHoursSourceConfig.API);
- String body = JsonUtils.toJsonString(bodyParams);
- this.setBody(body);
+ this.setBody(bodyParams);
this.setRetryParameters(pluginConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
index 3734ebed49..c89cf72427 100644
---
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
+++
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
@@ -195,7 +195,8 @@ public class PrometheusSourceReader extends
AbstractSingleSplitReader<SeaTunnelR
this.httpParameter.getMethod().getMethod(),
this.httpParameter.getHeaders(),
this.httpParameter.getParams(),
- this.httpParameter.getBody());
+ this.httpParameter.getBody(),
+ this.httpParameter.isKeepParamsAsForm());
if (response.getCode() >= 200 && response.getCode() <= 207) {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 5e017f3636..6533e2bad8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -271,6 +271,11 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSourceToAssertSink(TestContainer container)
throws IOException, InterruptedException {
+ // dynamic param for body
+ Container.ExecResult execResult0 =
+ container.executeJob("/http_post_param_json_to_assert.conf");
+ Assertions.assertEquals(0, execResult0.getExitCode());
+
// normal http
Container.ExecResult execResult1 =
container.executeJob("/http_json_to_assert.conf");
Assertions.assertEquals(0, execResult1.getExitCode());
@@ -325,6 +330,10 @@ public class HttpIT extends TestSuiteBase implements
TestResource {
container.executeJob("/http_formrequestbody_to_assert.conf");
Assertions.assertEquals(0, execResult13.getExitCode());
+ Container.ExecResult execResult20 =
+ container.executeJob("/http_formrequestbody_to_assert2.conf");
+ Assertions.assertEquals(0, execResult20.getExitCode());
+
// http httpJsonRequestBody
Container.ExecResult execResult14 =
container.executeJob("/http_jsonrequestbody_to_assert.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
index a8793f73b7..f10cffa8f0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
@@ -25,6 +25,7 @@ source {
plugin_output = "http"
url = "http://mockserver:1080/example/formBody"
method = "POST"
+ keep_params_as_form = true
params ={id = 1}
format = "json"
schema = {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf
similarity index 95%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf
index a8793f73b7..218dcf711c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf
@@ -25,7 +25,10 @@ source {
plugin_output = "http"
url = "http://mockserver:1080/example/formBody"
method = "POST"
- params ={id = 1}
+ headers {
+ Content-Type = "application/x-www-form-urlencoded"
+ }
+ body="{"id":1}"
format = "json"
schema = {
fields {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
index 0ed8f4d8e6..db6ea2edd2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
@@ -30,6 +30,7 @@ source {
name = "$.data[*].name"
age = "$.data[*].age"
}
+ keep_page_param_as_http_param = true
pageing = {
batch_size=10
page_field = page
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
index c0812b1789..a70dba8779 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
@@ -30,6 +30,7 @@ source {
name = "$.data[*].name"
age = "$.data[*].age"
}
+ keep_page_param_as_http_param = true
pageing = {
total_page_size = 2
page_field = page
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
index 06282c675e..731cb3ab0a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
@@ -30,6 +30,7 @@ source {
name = "$.data[*].name"
age = "$.data[*].age"
}
+ keep_page_param_as_http_param = true
pageing = {
total_page_size = 2
page_field = page
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf
similarity index 89%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf
index a8793f73b7..85b254d98f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf
@@ -23,10 +23,15 @@ env {
source {
Http {
plugin_output = "http"
- url = "http://mockserver:1080/example/formBody"
+ url = "http://mockserver:1080/example/jsonBody/dynamic/param"
method = "POST"
- params ={id = 1}
+ body="""{"id":1,"pageIndex":"${pageIndex}"}"""
format = "json"
+ pageing={
+ page_field = pageIndex
+ start_page_number = 2
+ batch_size = 10
+ }
schema = {
fields {
name = string
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 42d000f713..2fc4a26961 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4476,6 +4476,35 @@
}
}
},
+ {
+ "httpRequest": {
+ "method" : "POST",
+ "path": "/example/jsonBody/dynamic/param",
+ "body": {
+ "type": "JSON",
+ "json": {
+ "id": 1,
+ "pageIndex": 2
+ },
+ "matchType": "STRICT"
+ }
+ },
+ "httpResponse": {
+ "body": [
+ {
+ "name": "lzl",
+ "age": 18
+ },
+ {
+ "name": "pizz",
+ "age": 19
+ }
+ ],
+ "headers": {
+ "Content-Type": "application/json"
+ }
+ }
+ },
{
"httpRequest": {
"path": "/example/formBody",
@@ -4745,4 +4774,4 @@
}
}
}
-]
+]
\ No newline at end of file