This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 926d591  [FLINK-39040] Additional body JSON for 
`GenericJsonAndUrlQueryCreator`
926d591 is described below

commit 926d5914b0f402d61757bbb7c13c76ebe2a7fc2e
Author: David Radley <[email protected]>
AuthorDate: Wed Feb 11 17:56:27 2026 +0000

    [FLINK-39040] Additional body JSON for `GenericJsonAndUrlQueryCreator`
---
 docs/content.zh/docs/connectors/table/http.md      |   3 +-
 docs/content/docs/connectors/table/http.md         |   3 +-
 .../GenericJsonAndUrlQueryCreator.java             |  21 +-
 .../GenericJsonAndUrlQueryCreatorFactory.java      |  85 +++-
 .../GenericJsonAndUrlQueryCreatorTest.java         | 439 ++++++++++++++++++++-
 5 files changed, 543 insertions(+), 8 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/http.md 
b/docs/content.zh/docs/connectors/table/http.md
index 3fad839..37bfc53 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -165,7 +165,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 
 | Option                                                                 | 
Required | Description/Value                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| connector                                                              | 
required | The Value should be set to _http_                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| connector                                                              | 
required | The Value should be set to _http_                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 | format                                                                 | 
required | Flink's format name that should be used to decode REST response, Use 
`json` for a typical REST endpoint.                                             
                                                                                
                                                                                
                                                                                
                  [...]
 | url                                                                    | 
required | The base URL that should be use for GET requests. For example 
_http://localhost:8080/client_                                                  
                                                                                
                                                                                
                                                                                
                         [...]
 | asyncPolling                                                           | 
optional | true/false - determines whether Async Polling should be used. 
Mechanism is based on Flink's Async I/O.                                        
                                                                                
                                                                                
                                                                                
                         [...]
@@ -205,6 +205,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 | http.source.lookup.proxy.password                                      | 
optional | Specify the password used for proxy authentication.                  
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 | http.request.query-param-fields                                        | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The 
names of the fields that will be mapped to query parameters. The parameters are 
separated by semicolons, such as `param1;param2`.                               
                                                                                
                                                                                
                       [...]
 | http.request.body-fields                                               | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The 
names of the fields that will be mapped to the body. The parameters are 
separated by semicolons, such as `param1;param2`.                               
                                                                                
                                                                                
                               [...]
+| http.request.additional-body-json                                      | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. 
Additional JSON content to be merged into the request body for PUT and POST 
operations. The value should be a valid JSON object string (e.g., 
`'{"opportunity":{"source":"flink"},"priority":1}'`) that will be parsed and 
its fields merged at the top level with the generated request body. For 
example, if the body is `{"id":123}` and additional-bod [...]
 | http.request.url-map                                                   | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map 
of insert names to column names used as url segments. Parses a string as a map 
of strings. For example if there are table columns called `customerId` and 
`orderId`, then specifying value `customerId:cid1,orderID:oid` and a url of 
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for 
the lookup query will dynami [...]
 
 ### Query Creators
diff --git a/docs/content/docs/connectors/table/http.md 
b/docs/content/docs/connectors/table/http.md
index 543a9e5..a259d24 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -165,7 +165,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 
 | Option                                                                 | 
Required | Description/Value                                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| connector                                                              | 
required | The Value should be set to _http_                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| connector                                                              | 
required | The Value should be set to _http_                                    
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 | format                                                                 | 
required | Flink's format name that should be used to decode REST response, Use 
`json` for a typical REST endpoint.                                             
                                                                                
                                                                                
                                                                                
                  [...]
 | url                                                                    | 
required | The base URL that should be use for GET requests. For example 
_http://localhost:8080/client_                                                  
                                                                                
                                                                                
                                                                                
                         [...]
 | asyncPolling                                                           | 
optional | true/false - determines whether Async Polling should be used. 
Mechanism is based on Flink's Async I/O.                                        
                                                                                
                                                                                
                                                                                
                         [...]
@@ -205,6 +205,7 @@ Note the options with the prefix _http_ are the HTTP 
connector specific options,
 | http.source.lookup.proxy.password                                      | 
optional | Specify the password used for proxy authentication.                  
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 | http.request.query-param-fields                                        | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The 
names of the fields that will be mapped to query parameters. The parameters are 
separated by semicolons, such as `param1;param2`.                               
                                                                                
                                                                                
                       [...]
 | http.request.body-fields                                               | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The 
names of the fields that will be mapped to the body. The parameters are 
separated by semicolons, such as `param1;param2`.                               
                                                                                
                                                                                
                               [...]
+| http.request.additional-body-json                                      | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. 
Additional JSON content to be merged into the request body for PUT and POST 
operations. The value should be a valid JSON object string (e.g., 
`'{"opportunity":{"source":"flink"},"priority":1}'`) that will be parsed and 
its fields merged at the top level with the generated request body. For 
example, if the body is `{"id":123}` and additional-bod [...]
 | http.request.url-map                                                   | 
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map 
of insert names to column names used as url segments. Parses a string as a map 
of strings. For example if there are table columns called `customerId` and 
`orderId`, then specifying value `customerId:cid1,orderID:oid` and a url of 
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for 
the lookup query will dynami [...]
 
 ### Query Creators
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
index a5c1f6f..409fac6 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
@@ -79,6 +79,7 @@ public class GenericJsonAndUrlQueryCreator implements 
LookupQueryCreator {
     private final List<String> requestQueryParamsFields;
     private final List<String> requestBodyFields;
     private final Map<String, String> requestUrlMap;
+    private final ObjectNode additionalRequestObject;
 
     /**
      * Construct a Generic JSON and URL query creator.
@@ -88,6 +89,8 @@ public class GenericJsonAndUrlQueryCreator implements 
LookupQueryCreator {
      * @param requestQueryParamsFields query param fields
      * @param requestBodyFields body fields used for PUT and POSTs
      * @param requestUrlMap url map
+     * @param additionalRequestObject pre-parsed additional JSON object to 
merge into request body
+     *     (parsed once in factory to avoid re-parsing on every lookup)
      * @param lookupRow lookup row itself.
      */
     public GenericJsonAndUrlQueryCreator(
@@ -96,6 +99,7 @@ public class GenericJsonAndUrlQueryCreator implements 
LookupQueryCreator {
             final List<String> requestQueryParamsFields,
             final List<String> requestBodyFields,
             final Map<String, String> requestUrlMap,
+            final ObjectNode additionalRequestObject,
             final LookupRow lookupRow) {
         this.httpMethod = httpMethod;
         this.serializationSchema = serializationSchema;
@@ -103,6 +107,7 @@ public class GenericJsonAndUrlQueryCreator implements 
LookupQueryCreator {
         this.requestQueryParamsFields = requestQueryParamsFields;
         this.requestBodyFields = requestBodyFields;
         this.requestUrlMap = requestUrlMap;
+        this.additionalRequestObject = additionalRequestObject;
     }
 
     @VisibleForTesting
@@ -145,9 +150,19 @@ public class GenericJsonAndUrlQueryCreator implements 
LookupQueryCreator {
             // Body-based queries
             // serialize to a string for the body.
             try {
-                lookupQuery =
-                        ObjectMapperAdapter.instance()
-                                
.writeValueAsString(jsonObject.retain(requestBodyFields));
+                ObjectNode bodyJsonObject = 
jsonObject.retain(requestBodyFields);
+
+                // Merge additional JSON if provided (already validated as 
object in factory)
+                if (additionalRequestObject != null) {
+                    // Merge all fields from additional JSON into the body
+                    // This preserves nested objects and arrays as-is
+                    additionalRequestObject
+                            .fields()
+                            .forEachRemaining(
+                                    entry -> 
bodyJsonObject.set(entry.getKey(), entry.getValue()));
+                }
+
+                lookupQuery = 
ObjectMapperAdapter.instance().writeValueAsString(bodyJsonObject);
             } catch (JsonProcessingException e) {
                 final String message = "Unable to convert Json Object to a 
string";
                 throw new RuntimeException(message, e);
diff --git 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
index c03ce7e..6c7e415 100644
--- 
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
+++ 
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
@@ -32,6 +32,11 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -92,6 +97,19 @@ public class GenericJsonAndUrlQueryCreatorFactory implements 
LookupQueryCreatorF
                                     + "The expected format of the map is:"
                                     + "<br>"
                                     + " key1:value1,key2:value2");
+    public static final ConfigOption<String> REQUEST_ADDITIONAL_BODY_JSON =
+            key("http.request.additional-body-json")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Additional JSON content to be merged into the 
request body"
+                                    + " for PUT and POST operations. The value 
should be a valid"
+                                    + " JSON object string (e.g., 
'{\"c\":789}') that will be parsed"
+                                    + " and its fields merged at the top level 
with the generated"
+                                    + " request body. For example, if the body 
(join keys and runtime values)"
+                                    + " is {\"a\":123,\"b\":456}"
+                                    + " and additional-body-json is 
'{\"c\":789}', the result will be"
+                                    + " {\"a\":123,\"b\":456,\"c\":789}.");
 
     @Override
     public LookupQueryCreator createLookupQueryCreator(
@@ -103,8 +121,13 @@ public class GenericJsonAndUrlQueryCreatorFactory 
implements LookupQueryCreatorF
         // get the information from config
         final List<String> requestQueryParamsFields =
                 readableConfig.get(REQUEST_QUERY_PARAM_FIELDS);
-        final List<String> requestBodyFields = 
readableConfig.get(REQUEST_BODY_FIELDS);
         Map<String, String> requestUrlMap = 
readableConfig.get(REQUEST_URL_MAP);
+        final List<String> requestBodyFields = 
readableConfig.get(REQUEST_BODY_FIELDS);
+        String additionalRequestJson =
+                
readableConfig.getOptional(REQUEST_ADDITIONAL_BODY_JSON).orElse(null);
+
+        ObjectNode additionalRequestObject =
+                getValidatedAdditionalObjectNode(requestBodyFields, 
additionalRequestJson);
 
         final SerializationFormatFactory jsonFormatFactory =
                 FactoryUtil.discoverFactory(
@@ -137,6 +160,7 @@ public class GenericJsonAndUrlQueryCreatorFactory 
implements LookupQueryCreatorF
                 requestQueryParamsFields,
                 requestBodyFields,
                 requestUrlMap,
+                additionalRequestObject,
                 lookupRow);
     }
 
@@ -152,6 +176,63 @@ public class GenericJsonAndUrlQueryCreatorFactory 
implements LookupQueryCreatorF
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_BODY_FIELDS, 
REQUEST_URL_MAP);
+        return Set.of(
+                REQUEST_QUERY_PARAM_FIELDS,
+                REQUEST_BODY_FIELDS,
+                REQUEST_URL_MAP,
+                REQUEST_ADDITIONAL_BODY_JSON);
+    }
+
+    /**
+     * Creates and validates the additional JSON node from configuration. This 
method parses the
+     * JSON once during factory creation to avoid re-parsing on every lookup 
request, improving
+     * runtime performance.
+     *
+     * @param requestBodyFields the list of request body field names (join 
keys)
+     * @param additionalRequestJson the additional JSON string to validate and 
parse
+     * @return the parsed ObjectNode, or null if no additional JSON is provided
+     * @throws IllegalArgumentException if the JSON is invalid or contains 
conflicting fields
+     */
+    private ObjectNode getValidatedAdditionalObjectNode(
+            List<String> requestBodyFields, String additionalRequestJson) {
+        if (additionalRequestJson == null || 
additionalRequestJson.trim().isEmpty()) {
+            return null;
+        }
+
+        try {
+            // Parse the additional JSON once to avoid re-parsing on every 
lookup
+            JsonNode jsonNode = 
ObjectMapperAdapter.instance().readTree(additionalRequestJson);
+
+            if (!jsonNode.isObject()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The %s must be a valid JSON object.",
+                                REQUEST_ADDITIONAL_BODY_JSON.key()));
+            }
+
+            // Collect all conflicting fields
+            Set<String> conflictingFields = new HashSet<>();
+            jsonNode.fieldNames().forEachRemaining(conflictingFields::add);
+            conflictingFields.retainAll(requestBodyFields);
+
+            // If there are conflicts, throw exception with all conflicting 
fields
+            if (!conflictingFields.isEmpty()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "The %s option should not override join keys, "
+                                        + "as join keys are expected to target 
different enrichments on a request basis. "
+                                        + "Found conflicting field(s): %s",
+                                REQUEST_ADDITIONAL_BODY_JSON.key(),
+                                String.join(", ", conflictingFields)));
+            }
+
+            return (ObjectNode) jsonNode;
+        } catch (JsonProcessingException e) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Invalid JSON in %s:",
+                            REQUEST_ADDITIONAL_BODY_JSON.key(), 
e.getMessage()),
+                    e);
+        }
     }
 }
diff --git 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index 570846b..be789c5 100644
--- 
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++ 
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -205,6 +205,443 @@ class GenericJsonAndUrlQueryCreatorTest {
         assertThat(row.getField(KEY_3).equals("1970-01-01T00:00:00.010Z"));
     }
 
+    @Test
+    public void testAdditionalJsonSimpleFields() throws Exception {
+        // GIVEN - Simple additional fields
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"c\":789,\"d\":\"extra\"}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN
+        String expectedJson = "{\"key1\":\"val1\",\"c\":789,\"d\":\"extra\"}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testAdditionalJsonNestedObject() throws Exception {
+        // GIVEN - Nested object in additional JSON
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"nested\":{\"field1\":\"value1\",\"field2\":123}}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN
+        String expectedJson =
+                
"{\"key1\":\"val1\",\"nested\":{\"field1\":\"value1\",\"field2\":123}}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testAdditionalJsonMultipleNestedObjects() throws Exception {
+        // GIVEN - Multiple nested objects
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"obj1\":{\"a\":1,\"b\":2},\"obj2\":{\"c\":3,\"d\":4}}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN
+        String expectedJson =
+                
"{\"key1\":\"val1\",\"obj1\":{\"a\":1,\"b\":2},\"obj2\":{\"c\":3,\"d\":4}}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testAdditionalJsonWithArray() throws Exception {
+        // GIVEN - Array in additional JSON
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"items\":[\"item1\",\"item2\",\"item3\"]}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN
+        String expectedJson = 
"{\"key1\":\"val1\",\"items\":[\"item1\",\"item2\",\"item3\"]}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testAdditionalJsonComplexStructure() throws Exception {
+        // GIVEN - Complex nested structure with arrays and objects
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                
"{\"metadata\":{\"tags\":[\"tag1\",\"tag2\"],\"count\":5},\"flags\":[true,false]}");
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN
+        String expectedJson =
+                
"{\"key1\":\"val1\",\"metadata\":{\"tags\":[\"tag1\",\"tag2\"],\"count\":5},\"flags\":[true,false]}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testAdditionalJsonWithBoolean() throws Exception {
+        // GIVEN - Boolean values in additional JSON
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"isActive\":true,\"isDeleted\":false}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN
+        String expectedJson = 
"{\"key1\":\"val1\",\"isActive\":true,\"isDeleted\":false}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testAdditionalJsonNullOrEmpty() {
+        // GIVEN - Null additional JSON
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        // No additional JSON set
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN - Should work without additional JSON
+        
assertThat(createdQuery.getLookupQuery()).isEqualTo("{\"key1\":\"val1\"}");
+    }
+
+    @Test
+    public void testAdditionalJsonInvalidJson() {
+        // GIVEN - Invalid JSON
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{invalid json}");
+
+        // WHEN/THEN - Should throw IllegalArgumentException during factory 
creation
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    new GenericJsonAndUrlQueryCreatorFactory()
+                            .createLookupQueryCreator(
+                                    config, lookupRow, getTableContext(config, 
RESOLVED_SCHEMA));
+                });
+    }
+
+    @Test
+    public void testAdditionalJsonNotAppliedToGet() {
+        // GIVEN - GET request with additional JSON
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("GET");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON, 
"{\"c\":789}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN - Additional JSON should not affect GET requests (query params 
only)
+        assertThat(createdQuery.getLookupQuery()).isEqualTo("key1=val1");
+    }
+
+    @Test
+    public void testAdditionalJsonOverridesJoinKey() {
+        // GIVEN - Additional JSON that tries to override a join key
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = getConfiguration("POST");
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"key1\":\"override_value\",\"c\":789}");
+
+        // WHEN/THEN - Should throw IllegalArgumentException
+        IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            new GenericJsonAndUrlQueryCreatorFactory()
+                                    .createLookupQueryCreator(
+                                            config,
+                                            lookupRow,
+                                            getTableContext(config, 
RESOLVED_SCHEMA));
+                        });
+
+        // Verify the error message
+        assertThat(exception.getMessage())
+                .contains(
+                        "The http.request.additional-body-json option should 
not override join keys");
+        assertThat(exception.getMessage())
+                .contains(
+                        "as join keys are expected to target different 
enrichments on a request basis");
+        assertThat(exception.getMessage()).contains("key1");
+    }
+
+    @Test
+    public void testAdditionalJsonOverridesMultipleJoinKeys() {
+        // GIVEN - Additional JSON that tries to override multiple join keys
+        LookupRow lookupRow = getLookupRow(KEY_1, KEY_2);
+        Configuration config = getConfiguration("POST");
+        // Set body fields to include both keys
+        config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS, 
List.of(KEY_1, KEY_2));
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"key1\":\"override1\",\"key2\":\"override2\",\"c\":789}");
+
+        // WHEN/THEN - Should throw IllegalArgumentException with all 
conflicting fields
+        IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            new GenericJsonAndUrlQueryCreatorFactory()
+                                    .createLookupQueryCreator(
+                                            config,
+                                            lookupRow,
+                                            getTableContext(config, 
RESOLVED_SCHEMA));
+                        });
+
+        // Verify the error message contains both fields
+        assertThat(exception.getMessage())
+                .contains(
+                        "The http.request.additional-body-json option should 
not override join keys");
+        assertThat(exception.getMessage())
+                .contains(
+                        "as join keys are expected to target different 
enrichments on a request basis");
+        assertThat(exception.getMessage()).contains("Found conflicting 
field(s):");
+        assertThat(exception.getMessage()).contains("key1");
+        assertThat(exception.getMessage()).contains("key2");
+    }
+
+    @Test
+    public void testAdditionalJsonOverridesMultipleJoinKeysDifferentOrder() {
+        // GIVEN - Additional JSON with fields in different order than body 
fields
+        // Body fields: key1, key2
+        // Additional JSON: key2, key1 (reversed order)
+        LookupRow lookupRow = getLookupRow(KEY_1, KEY_2);
+        Configuration config = getConfiguration("POST");
+        // Set body fields: key1, key2
+        config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS, 
List.of(KEY_1, KEY_2));
+        // Additional JSON has reversed order: key2, key1
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"key2\":\"override2\",\"key1\":\"override1\",\"c\":789}");
+
+        // WHEN/THEN - Should throw IllegalArgumentException with all 
conflicting fields
+        IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            new GenericJsonAndUrlQueryCreatorFactory()
+                                    .createLookupQueryCreator(
+                                            config,
+                                            lookupRow,
+                                            getTableContext(config, 
RESOLVED_SCHEMA));
+                        });
+
+        // Verify the error message contains both fields regardless of order
+        assertThat(exception.getMessage())
+                .contains(
+                        "The http.request.additional-body-json option should 
not override join keys");
+        assertThat(exception.getMessage())
+                .contains(
+                        "as join keys are expected to target different 
enrichments on a request basis");
+        assertThat(exception.getMessage()).contains("Found conflicting 
field(s):");
+        assertThat(exception.getMessage()).contains("key1");
+        assertThat(exception.getMessage()).contains("key2");
+    }
+
+    @Test
+    public void testAdditionalJsonWithNoBodyFields() throws Exception {
+        // GIVEN - Lookup key mapped to query param, not body field
+        // This allows additional JSON to be the entire request body
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        Configuration config = new Configuration();
+        config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, 
urlParams);
+        config.set(LOOKUP_METHOD, "POST");
+        // Map lookup key to query param, not body field
+        
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS, 
QUERY_PARAMS);
+        // Don't set REQUEST_BODY_FIELDS - it will be empty (no body fields)
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                
"{\"staticField\":\"staticValue\",\"count\":42,\"active\":true}");
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // WHEN
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+
+        // THEN - Additional JSON should be used as the entire body
+        String expectedJson = 
"{\"staticField\":\"staticValue\",\"count\":42,\"active\":true}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode expected = mapper.readTree(expectedJson);
+        JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+        assertThat(actual).isEqualTo(expected);
+        // Verify the lookup key is in query params, not body
+        
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEqualTo(KEY_1 + "=" 
+ VALUE);
+    }
+
+    @Test
+    public void testAdditionalJsonOverridesBodyFieldFromUserScenario() {
+        // GIVEN - User scenario: body field 'customerId' with additional JSON 
trying to override it
+        LookupRow lookupRow = new LookupRow();
+        lookupRow.addLookupEntry(
+                new RowDataSingleValueLookupSchemaEntry(
+                        "customerId",
+                        
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+        lookupRow.setLookupPhysicalRowDataType(
+                row(List.of(DataTypes.FIELD("customerId", 
DataTypes.STRING()))));
+
+        Configuration config = new Configuration();
+        config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, 
urlParams);
+        config.set(LOOKUP_METHOD, "POST");
+        config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS, 
List.of("customerId"));
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"customerId\":\"bbb\"}");
+
+        // WHEN/THEN - Should throw IllegalArgumentException because 
additional JSON tries to
+        // override body field
+        IllegalArgumentException exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            new GenericJsonAndUrlQueryCreatorFactory()
+                                    .createLookupQueryCreator(
+                                            config,
+                                            lookupRow,
+                                            getTableContext(config, 
RESOLVED_SCHEMA));
+                        });
+
+        // Verify error message mentions the conflicting field
+        assertThat(exception.getMessage()).contains("customerId");
+        assertThat(exception.getMessage())
+                .contains("http.request.additional-body-json option should not 
override join keys");
+    }
+
+    @Test
+    public void testAdditionalJsonCaseSensitiveJoinKeyCheck() {
+        // GIVEN - Additional JSON with different case than join key
+        LookupRow lookupRow = getLookupRow(KEY_1); // key1
+        Configuration config = getConfiguration("POST");
+        // KEY1 (uppercase) should not conflict with key1 (lowercase) - case 
sensitive
+        config.set(
+                
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+                "{\"KEY1\":\"value\",\"c\":789}");
+
+        // WHEN - Should succeed because case is different
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, 
RESOLVED_SCHEMA));
+
+        // THEN - Should work fine
+        var createdQuery = creator.createLookupQuery(ROWDATA);
+        String lookupQuery = createdQuery.getLookupQuery();
+        assertThat(lookupQuery).contains("key1");
+        assertThat(lookupQuery).contains("KEY1");
+    }
+
     private static void validateCreatedQueryForGet(LookupQueryInfo 
createdQuery) {
         // check there is no body params and we have the expected lookup query
         assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty();
@@ -232,7 +669,7 @@ class GenericJsonAndUrlQueryCreatorTest {
             
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS, 
QUERY_PARAMS);
         }
         config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, 
urlParams);
-        config.setString(LOOKUP_METHOD, operation);
+        config.set(LOOKUP_METHOD, operation);
         return config;
     }
 


Reply via email to