Hisoka-X commented on code in PR #9184:
URL: https://github.com/apache/seatunnel/pull/9184#discussion_r2065289974


##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java:
##########
@@ -170,41 +171,145 @@ private void updateRequestParam(PageInfo pageInfo) {
             }
             return;
         }
-
+        Long pageValue = pageInfo.getPageIndex();
+        String pageField = pageInfo.getPageField();
+
+        // Process headers
+        if (MapUtils.isNotEmpty(this.httpParameter.getHeaders())) {
+            processPageMap(
+                    this.httpParameter.getHeaders(),
+                    pageField,
+                    pageValue.toString(),
+                    usePlaceholderReplacement);
+
+            processPageMap(
+                    this.httpParameter.getHeaders(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
+        }
         // if not set keepPageParamAsHttpParam, but page field is in params, 
then set page index as
-        // params
         if (MapUtils.isNotEmpty(this.httpParameter.getParams())) {
 
-            // set page index as params
-            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
-                this.httpParameter
-                        .getParams()
-                        .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
-            }
-
-            // set page cursor as params
-            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageCursorFieldName())
-                    && pageInfo.getCursor() != null) {
-                this.httpParameter
-                        .getParams()
-                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
-            }
+            processPageMap(
+                    this.httpParameter.getParams(),
+                    pageField,
+                    pageValue.toString(),
+                    usePlaceholderReplacement);
+            processPageMap(
+                    this.httpParameter.getParams(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
         }
 
         // 2. param in body
         if (MapUtils.isNotEmpty(this.httpParameter.getBody())) {
+            processBodyPageMap(
+                    this.httpParameter.getBody(), pageField, pageValue, 
usePlaceholderReplacement);
+            processBodyPageMap(
+                    this.httpParameter.getBody(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
+        }
+    }
+
+    /**
+     * Replace placeholder in a string value
+     *
+     * @param value The string value that may contain a placeholder
+     * @param pageField The page field name
+     * @param pageValue The page value to replace the placeholder with
+     * @return The string with placeholder replaced, or null if no placeholder 
found
+     */
+    private String replacePlaceholder(String value, String pageField, Object 
pageValue) {
+        if (value == null || pageField == null || !value.contains("${" + 
pageField + "}")) {
+            return null;

Review Comment:
   @CosmosNi 



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java:
##########
@@ -170,41 +171,145 @@ private void updateRequestParam(PageInfo pageInfo) {
             }
             return;
         }
-
+        Long pageValue = pageInfo.getPageIndex();
+        String pageField = pageInfo.getPageField();
+
+        // Process headers
+        if (MapUtils.isNotEmpty(this.httpParameter.getHeaders())) {
+            processPageMap(
+                    this.httpParameter.getHeaders(),
+                    pageField,
+                    pageValue.toString(),
+                    usePlaceholderReplacement);
+
+            processPageMap(
+                    this.httpParameter.getHeaders(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
+        }
         // if not set keepPageParamAsHttpParam, but page field is in params, 
then set page index as
-        // params
         if (MapUtils.isNotEmpty(this.httpParameter.getParams())) {
 
-            // set page index as params
-            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
-                this.httpParameter
-                        .getParams()
-                        .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
-            }
-
-            // set page cursor as params
-            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageCursorFieldName())
-                    && pageInfo.getCursor() != null) {
-                this.httpParameter
-                        .getParams()
-                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
-            }
+            processPageMap(
+                    this.httpParameter.getParams(),
+                    pageField,
+                    pageValue.toString(),
+                    usePlaceholderReplacement);
+            processPageMap(
+                    this.httpParameter.getParams(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
         }
 
         // 2. param in body
         if (MapUtils.isNotEmpty(this.httpParameter.getBody())) {
+            processBodyPageMap(
+                    this.httpParameter.getBody(), pageField, pageValue, 
usePlaceholderReplacement);
+            processBodyPageMap(
+                    this.httpParameter.getBody(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
+        }
+    }
+
+    /**
+     * Replace placeholder in a string value
+     *
+     * @param value The string value that may contain a placeholder
+     * @param pageField The page field name
+     * @param pageValue The page value to replace the placeholder with
+     * @return The string with placeholder replaced, or null if no placeholder 
found
+     */
+    private String replacePlaceholder(String value, String pageField, Object 
pageValue) {
+        if (value == null || pageField == null || !value.contains("${" + 
pageField + "}")) {
+            return null;
+        }
+
+        String placeholder = "${" + pageField + "}";
+        int placeholderIndex = value.indexOf(placeholder);
+        if (placeholderIndex >= 0) {
+            String prefix = value.substring(0, placeholderIndex);
+            String suffix = value.substring(placeholderIndex + 
placeholder.length());
+            return prefix + pageValue + suffix;
+        }
+        return null;
+    }
 
-            // set page index as body
-            if 
(this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
-                this.httpParameter.getBody().put(pageInfo.getPageField(), 
pageInfo.getPageIndex());
+    private void processPageMap(
+            Map<String, String> map,
+            String pageField,
+            String pageValue,
+            boolean usePlaceholderReplacement) {
+        if (usePlaceholderReplacement) {
+            // Placeholder replacement
+            Map<String, String> updatedMap = new HashMap<>();
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+                String replacedValue = replacePlaceholder(value, pageField, 
pageValue);
+                if (replacedValue != null) {
+                    updatedMap.put(key, replacedValue);
+                }
             }
+            map.putAll(updatedMap);
+        } else if (map.containsKey(pageField)) {
+            // Key-based replacement
+            map.put(pageField, pageValue);
+        }
+    }
 
-            // set page cursor as body
-            if 
(this.httpParameter.getBody().containsKey(pageInfo.getPageCursorFieldName())
-                    && pageInfo.getCursor() != null) {
-                this.httpParameter
-                        .getBody()
-                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
+    /**
+     * Process the body map for parameter replacement, handling nested objects 
by converting the
+     * entire object to JSON string, replacing placeholders, and converting 
back to object.
+     *
+     * @param bodyMap The body map to process
+     * @param pageField The page field name
+     * @param pageValue The page index (Long value for direct replacement)
+     * @param usePlaceholderReplacement Whether to use placeholder replacement
+     */
+    private void processBodyPageMap(
+            Map<String, Object> bodyMap,
+            String pageField,
+            Object pageValue,
+            boolean usePlaceholderReplacement) {
+        if (usePlaceholderReplacement) {
+            String jsonBody = JsonUtils.toJsonString(bodyMap);
+            String placeholder = "\"${" + pageField + "}\"";

Review Comment:
   please delete it.



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java:
##########
@@ -170,41 +173,214 @@ private void updateRequestParam(PageInfo pageInfo) {
             }
             return;
         }
-
+        Long pageValue = pageInfo.getPageIndex();
+        String pageField = pageInfo.getPageField();
+
+        // Process headers
+        if (MapUtils.isNotEmpty(this.httpParameter.getHeaders())) {
+            processPageMap(
+                    this.httpParameter.getHeaders(),
+                    pageField,
+                    pageValue.toString(),
+                    usePlaceholderReplacement);
+
+            processPageMap(
+                    this.httpParameter.getHeaders(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
+        }
         // if not set keepPageParamAsHttpParam, but page field is in params, 
then set page index as
-        // params
         if (MapUtils.isNotEmpty(this.httpParameter.getParams())) {
 
-            // set page index as params
-            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
-                this.httpParameter
-                        .getParams()
-                        .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+            processPageMap(
+                    this.httpParameter.getParams(),
+                    pageField,
+                    pageValue.toString(),
+                    usePlaceholderReplacement);
+            processPageMap(
+                    this.httpParameter.getParams(),
+                    pageInfo.getPageCursorFieldName(),
+                    pageInfo.getCursor(),
+                    usePlaceholderReplacement);
+        }
+
+        // 2. param in body
+        if (!Strings.isNullOrEmpty(this.httpParameter.getBody())) {
+            String processedBody =
+                    processBodyString(
+                            this.httpParameter.getBody(),
+                            pageField,
+                            pageValue,
+                            usePlaceholderReplacement);
+
+            // Process cursor if available
+            if (pageInfo.getPageCursorFieldName() != null && 
pageInfo.getCursor() != null) {
+                processedBody =
+                        processBodyString(
+                                processedBody,
+                                pageInfo.getPageCursorFieldName(),
+                                pageInfo.getCursor(),
+                                usePlaceholderReplacement);
             }
 
-            // set page cursor as params
-            if 
(this.httpParameter.getParams().containsKey(pageInfo.getPageCursorFieldName())
-                    && pageInfo.getCursor() != null) {
-                this.httpParameter
-                        .getParams()
-                        .put(pageInfo.getPageCursorFieldName(), 
pageInfo.getCursor());
+            // Update the body string
+            this.httpParameter.setBody(processedBody);
+        }
+    }
+
+    /**
+     * Replace placeholder in a string value
+     *
+     * @param value The string value that may contain a placeholder
+     * @param pageField The page field name
+     * @param pageValue The page value to replace the placeholder with
+     * @return The string with placeholder replaced, or null if no placeholder 
found
+     */
+    private String replacePlaceholder(String value, String pageField, Object 
pageValue) {
+        if (value == null || pageField == null || !value.contains("${" + 
pageField + "}")) {
+            return null;
+        }
+
+        String placeholder = "${" + pageField + "}";
+        int placeholderIndex = value.indexOf(placeholder);
+        if (placeholderIndex >= 0) {
+            String prefix = value.substring(0, placeholderIndex);
+            String suffix = value.substring(placeholderIndex + 
placeholder.length());
+            return prefix + pageValue + suffix;
+        }
+        return null;
+    }
+
+    private void processPageMap(
+            Map<String, String> map,
+            String pageField,
+            String pageValue,
+            boolean usePlaceholderReplacement) {
+        if (usePlaceholderReplacement) {
+            // Placeholder replacement
+            Map<String, String> updatedMap = new HashMap<>();
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+                String replacedValue = replacePlaceholder(value, pageField, 
pageValue);
+                if (replacedValue != null) {
+                    updatedMap.put(key, replacedValue);
+                }
             }
+            map.putAll(updatedMap);
+        } else if (map.containsKey(pageField)) {
+            // Key-based replacement
+            map.put(pageField, pageValue);
         }
+    }
 
-        // 2. param in body
-        if (MapUtils.isNotEmpty(this.httpParameter.getBody())) {
+    private String processBodyString(
+            String bodyString,
+            String pageField,
+            Object pageValue,
+            boolean usePlaceholderReplacement) {
+        if (pageField == null || pageValue == null || 
Strings.isNullOrEmpty(bodyString)) {
+            return bodyString;
+        }
+        if (usePlaceholderReplacement) {
+            String unquotedPlaceholder = "${" + pageField + "}";
+            if (bodyString.contains(unquotedPlaceholder)) {
+                bodyString = bodyString.replace(unquotedPlaceholder, 
pageValue.toString());
+            }
 
-            // set page index as body
-            if 
(this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
-                this.httpParameter.getBody().put(pageInfo.getPageField(), 
pageInfo.getPageIndex());
+            return bodyString;
+        } else {
+            // Key-based replacement
+            try {
+                Map<String, Object> bodyMap =
+                        JsonUtils.parseObject(
+                                bodyString, new TypeReference<Map<String, 
Object>>() {});
+                if (bodyMap != null) {
+                    processBodyMapRecursively(bodyMap, pageField, pageValue);
+                    return JsonUtils.toJsonString(bodyMap);
+                }
+            } catch (Exception e) {
+                log.warn(
+                        "Failed to parse body string for key-based 
replacement: {}", bodyString, e);

Review Comment:
   throw exception too.



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java:
##########
@@ -118,9 +122,25 @@ public HttpResponse execute(
             String method,
             Map<String, String> headers,
             Map<String, String> params,
-            Map<String, Object> body,
+            String body,
             boolean keepParamsAsForm)
             throws Exception {
+        Map<String, Object> bodyMap = new HashMap<>();
+        // If body is set but bodyMap is not, convert body to bodyMap
+        if (!Strings.isNullOrEmpty(body)) {
+            try {
+                bodyMap =
+                        ConfigFactory.parseString(body).entrySet().stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                Map.Entry::getKey,
+                                                entry -> 
entry.getValue().unwrapped(),
+                                                (v1, v2) -> v2));
+            } catch (Exception e) {
+                log.warn("Failed to parse body string to map: {}", body, e);

Review Comment:
   we shoud throw exception directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to