This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 926d591 [FLINK-39040] Additional body JSON for
`GenericJsonAndUrlQueryCreator`
926d591 is described below
commit 926d5914b0f402d61757bbb7c13c76ebe2a7fc2e
Author: David Radley <[email protected]>
AuthorDate: Wed Feb 11 17:56:27 2026 +0000
[FLINK-39040] Additional body JSON for `GenericJsonAndUrlQueryCreator`
---
docs/content.zh/docs/connectors/table/http.md | 3 +-
docs/content/docs/connectors/table/http.md | 3 +-
.../GenericJsonAndUrlQueryCreator.java | 21 +-
.../GenericJsonAndUrlQueryCreatorFactory.java | 85 +++-
.../GenericJsonAndUrlQueryCreatorTest.java | 439 ++++++++++++++++++++-
5 files changed, 543 insertions(+), 8 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index 3fad839..37bfc53 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -165,7 +165,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| Option |
Required | Description/Value
[...]
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| connector |
required | The Value should be set to _http_
[...]
+| connector |
required | The Value should be set to _http_
[...]
| format |
required | Flink's format name that should be used to decode REST response, Use
`json` for a typical REST endpoint.
[...]
| url |
required | The base URL that should be use for GET requests. For example
_http://localhost:8080/client_
[...]
| asyncPolling |
optional | true/false - determines whether Async Polling should be used.
Mechanism is based on Flink's Async I/O.
[...]
@@ -205,6 +205,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.source.lookup.proxy.password |
optional | Specify the password used for proxy authentication.
[...]
| http.request.query-param-fields |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The
names of the fields that will be mapped to query parameters. The parameters are
separated by semicolons, such as `param1;param2`.
[...]
| http.request.body-fields |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The
names of the fields that will be mapped to the body. The parameters are
separated by semicolons, such as `param1;param2`.
[...]
+| http.request.additional-body-json |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator.
Additional JSON content to be merged into the request body for PUT and POST
operations. The value should be a valid JSON object string (e.g.,
`'{"opportunity":{"source":"flink"},"priority":1}'`) that will be parsed and
its fields merged at the top level with the generated request body. For
example, if the body is `{"id":123}` and additional-bod [...]
| http.request.url-map |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map
of insert names to column names used as url segments. Parses a string as a map
of strings. For example if there are table columns called `customerId` and
`orderId`, then specifying value `customerId:cid1,orderID:oid` and a url of
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for
the lookup query will dynami [...]
### Query Creators
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index 543a9e5..a259d24 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -165,7 +165,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| Option |
Required | Description/Value
[...]
|:-----------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| connector |
required | The Value should be set to _http_
[...]
+| connector |
required | The Value should be set to _http_
[...]
| format |
required | Flink's format name that should be used to decode REST response, Use
`json` for a typical REST endpoint.
[...]
| url |
required | The base URL that should be use for GET requests. For example
_http://localhost:8080/client_
[...]
| asyncPolling |
optional | true/false - determines whether Async Polling should be used.
Mechanism is based on Flink's Async I/O.
[...]
@@ -205,6 +205,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.source.lookup.proxy.password |
optional | Specify the password used for proxy authentication.
[...]
| http.request.query-param-fields |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The
names of the fields that will be mapped to query parameters. The parameters are
separated by semicolons, such as `param1;param2`.
[...]
| http.request.body-fields |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The
names of the fields that will be mapped to the body. The parameters are
separated by semicolons, such as `param1;param2`.
[...]
+| http.request.additional-body-json |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator.
Additional JSON content to be merged into the request body for PUT and POST
operations. The value should be a valid JSON object string (e.g.,
`'{"opportunity":{"source":"flink"},"priority":1}'`) that will be parsed and
its fields merged at the top level with the generated request body. For
example, if the body is `{"id":123}` and additional-bod [...]
| http.request.url-map |
optional | Used for the `GenericJsonAndUrlQueryCreator` query creator. The map
of insert names to column names used as url segments. Parses a string as a map
of strings. For example if there are table columns called `customerId` and
`orderId`, then specifying value `customerId:cid1,orderID:oid` and a url of
https://myendpoint/customers/{cid}/orders/{oid} will mean that the url used for
the lookup query will dynami [...]
### Query Creators
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
index a5c1f6f..409fac6 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
@@ -79,6 +79,7 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
private final List<String> requestQueryParamsFields;
private final List<String> requestBodyFields;
private final Map<String, String> requestUrlMap;
+ private final ObjectNode additionalRequestObject;
/**
* Construct a Generic JSON and URL query creator.
@@ -88,6 +89,8 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
* @param requestQueryParamsFields query param fields
* @param requestBodyFields body fields used for PUT and POSTs
* @param requestUrlMap url map
+ * @param additionalRequestObject pre-parsed additional JSON object to
merge into request body
+ * (parsed once in factory to avoid re-parsing on every lookup)
* @param lookupRow lookup row itself.
*/
public GenericJsonAndUrlQueryCreator(
@@ -96,6 +99,7 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
final List<String> requestQueryParamsFields,
final List<String> requestBodyFields,
final Map<String, String> requestUrlMap,
+ final ObjectNode additionalRequestObject,
final LookupRow lookupRow) {
this.httpMethod = httpMethod;
this.serializationSchema = serializationSchema;
@@ -103,6 +107,7 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
this.requestQueryParamsFields = requestQueryParamsFields;
this.requestBodyFields = requestBodyFields;
this.requestUrlMap = requestUrlMap;
+ this.additionalRequestObject = additionalRequestObject;
}
@VisibleForTesting
@@ -145,9 +150,19 @@ public class GenericJsonAndUrlQueryCreator implements
LookupQueryCreator {
// Body-based queries
// serialize to a string for the body.
try {
- lookupQuery =
- ObjectMapperAdapter.instance()
-
.writeValueAsString(jsonObject.retain(requestBodyFields));
+ ObjectNode bodyJsonObject =
jsonObject.retain(requestBodyFields);
+
+ // Merge additional JSON if provided (already validated as
object in factory)
+ if (additionalRequestObject != null) {
+ // Merge all fields from additional JSON into the body
+ // This preserves nested objects and arrays as-is
+ additionalRequestObject
+ .fields()
+ .forEachRemaining(
+ entry ->
bodyJsonObject.set(entry.getKey(), entry.getValue()));
+ }
+
+ lookupQuery =
ObjectMapperAdapter.instance().writeValueAsString(bodyJsonObject);
} catch (JsonProcessingException e) {
final String message = "Unable to convert Json Object to a
string";
throw new RuntimeException(message, e);
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
index c03ce7e..6c7e415 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
@@ -32,6 +32,11 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -92,6 +97,19 @@ public class GenericJsonAndUrlQueryCreatorFactory implements
LookupQueryCreatorF
+ "The expected format of the map is:"
+ "<br>"
+ " key1:value1,key2:value2");
+ public static final ConfigOption<String> REQUEST_ADDITIONAL_BODY_JSON =
+ key("http.request.additional-body-json")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Additional JSON content to be merged into the
request body"
+ + " for PUT and POST operations. The value
should be a valid"
+ + " JSON object string (e.g.,
'{\"c\":789}') that will be parsed"
+ + " and its fields merged at the top level
with the generated"
+ + " request body. For example, if the body
(join keys and runtime values)"
+ + " is {\"a\":123,\"b\":456}"
+ + " and additional-body-json is
'{\"c\":789}', the result will be"
+ + " {\"a\":123,\"b\":456,\"c\":789}.");
@Override
public LookupQueryCreator createLookupQueryCreator(
@@ -103,8 +121,13 @@ public class GenericJsonAndUrlQueryCreatorFactory
implements LookupQueryCreatorF
// get the information from config
final List<String> requestQueryParamsFields =
readableConfig.get(REQUEST_QUERY_PARAM_FIELDS);
- final List<String> requestBodyFields =
readableConfig.get(REQUEST_BODY_FIELDS);
Map<String, String> requestUrlMap =
readableConfig.get(REQUEST_URL_MAP);
+ final List<String> requestBodyFields =
readableConfig.get(REQUEST_BODY_FIELDS);
+ String additionalRequestJson =
+
readableConfig.getOptional(REQUEST_ADDITIONAL_BODY_JSON).orElse(null);
+
+ ObjectNode additionalRequestObject =
+ getValidatedAdditionalObjectNode(requestBodyFields,
additionalRequestJson);
final SerializationFormatFactory jsonFormatFactory =
FactoryUtil.discoverFactory(
@@ -137,6 +160,7 @@ public class GenericJsonAndUrlQueryCreatorFactory
implements LookupQueryCreatorF
requestQueryParamsFields,
requestBodyFields,
requestUrlMap,
+ additionalRequestObject,
lookupRow);
}
@@ -152,6 +176,63 @@ public class GenericJsonAndUrlQueryCreatorFactory
implements LookupQueryCreatorF
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_BODY_FIELDS,
REQUEST_URL_MAP);
+ return Set.of(
+ REQUEST_QUERY_PARAM_FIELDS,
+ REQUEST_BODY_FIELDS,
+ REQUEST_URL_MAP,
+ REQUEST_ADDITIONAL_BODY_JSON);
+ }
+
+ /**
+ * Creates and validates the additional JSON node from configuration. This
method parses the
+ * JSON once during factory creation to avoid re-parsing on every lookup
request, improving
+ * runtime performance.
+ *
+ * @param requestBodyFields the list of request body field names (join
keys)
+ * @param additionalRequestJson the additional JSON string to validate and
parse
+ * @return the parsed ObjectNode, or null if no additional JSON is provided
+ * @throws IllegalArgumentException if the JSON is invalid or contains
conflicting fields
+ */
+ private ObjectNode getValidatedAdditionalObjectNode(
+ List<String> requestBodyFields, String additionalRequestJson) {
+ if (additionalRequestJson == null ||
additionalRequestJson.trim().isEmpty()) {
+ return null;
+ }
+
+ try {
+ // Parse the additional JSON once to avoid re-parsing on every
lookup
+ JsonNode jsonNode =
ObjectMapperAdapter.instance().readTree(additionalRequestJson);
+
+ if (!jsonNode.isObject()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The %s must be a valid JSON object.",
+ REQUEST_ADDITIONAL_BODY_JSON.key()));
+ }
+
+ // Collect all conflicting fields
+ Set<String> conflictingFields = new HashSet<>();
+ jsonNode.fieldNames().forEachRemaining(conflictingFields::add);
+ conflictingFields.retainAll(requestBodyFields);
+
+ // If there are conflicts, throw exception with all conflicting
fields
+ if (!conflictingFields.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The %s option should not override join keys, "
+ + "as join keys are expected to target
different enrichments on a request basis. "
+ + "Found conflicting field(s): %s",
+ REQUEST_ADDITIONAL_BODY_JSON.key(),
+ String.join(", ", conflictingFields)));
+ }
+
+ return (ObjectNode) jsonNode;
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid JSON in %s:",
+ REQUEST_ADDITIONAL_BODY_JSON.key(),
e.getMessage()),
+ e);
+ }
}
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index 570846b..be789c5 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -205,6 +205,443 @@ class GenericJsonAndUrlQueryCreatorTest {
assertThat(row.getField(KEY_3).equals("1970-01-01T00:00:00.010Z"));
}
+ @Test
+ public void testAdditionalJsonSimpleFields() throws Exception {
+ // GIVEN - Simple additional fields
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"c\":789,\"d\":\"extra\"}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN
+ String expectedJson = "{\"key1\":\"val1\",\"c\":789,\"d\":\"extra\"}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAdditionalJsonNestedObject() throws Exception {
+ // GIVEN - Nested object in additional JSON
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"nested\":{\"field1\":\"value1\",\"field2\":123}}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN
+ String expectedJson =
+
"{\"key1\":\"val1\",\"nested\":{\"field1\":\"value1\",\"field2\":123}}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAdditionalJsonMultipleNestedObjects() throws Exception {
+ // GIVEN - Multiple nested objects
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"obj1\":{\"a\":1,\"b\":2},\"obj2\":{\"c\":3,\"d\":4}}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN
+ String expectedJson =
+
"{\"key1\":\"val1\",\"obj1\":{\"a\":1,\"b\":2},\"obj2\":{\"c\":3,\"d\":4}}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAdditionalJsonWithArray() throws Exception {
+ // GIVEN - Array in additional JSON
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"items\":[\"item1\",\"item2\",\"item3\"]}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN
+ String expectedJson =
"{\"key1\":\"val1\",\"items\":[\"item1\",\"item2\",\"item3\"]}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAdditionalJsonComplexStructure() throws Exception {
+ // GIVEN - Complex nested structure with arrays and objects
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+
"{\"metadata\":{\"tags\":[\"tag1\",\"tag2\"],\"count\":5},\"flags\":[true,false]}");
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN
+ String expectedJson =
+
"{\"key1\":\"val1\",\"metadata\":{\"tags\":[\"tag1\",\"tag2\"],\"count\":5},\"flags\":[true,false]}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAdditionalJsonWithBoolean() throws Exception {
+ // GIVEN - Boolean values in additional JSON
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"isActive\":true,\"isDeleted\":false}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN
+ String expectedJson =
"{\"key1\":\"val1\",\"isActive\":true,\"isDeleted\":false}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testAdditionalJsonNullOrEmpty() {
+ // GIVEN - Null additional JSON
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ // No additional JSON set
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN - Should work without additional JSON
+
assertThat(createdQuery.getLookupQuery()).isEqualTo("{\"key1\":\"val1\"}");
+ }
+
+ @Test
+ public void testAdditionalJsonInvalidJson() {
+ // GIVEN - Invalid JSON
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{invalid json}");
+
+ // WHEN/THEN - Should throw IllegalArgumentException during factory
creation
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config, lookupRow, getTableContext(config,
RESOLVED_SCHEMA));
+ });
+ }
+
+ @Test
+ public void testAdditionalJsonNotAppliedToGet() {
+ // GIVEN - GET request with additional JSON
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("GET");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
"{\"c\":789}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN - Additional JSON should not affect GET requests (query params
only)
+ assertThat(createdQuery.getLookupQuery()).isEqualTo("key1=val1");
+ }
+
+ @Test
+ public void testAdditionalJsonOverridesJoinKey() {
+ // GIVEN - Additional JSON that tries to override a join key
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = getConfiguration("POST");
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"key1\":\"override_value\",\"c\":789}");
+
+ // WHEN/THEN - Should throw IllegalArgumentException
+ IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+ });
+
+ // Verify the error message
+ assertThat(exception.getMessage())
+ .contains(
+ "The http.request.additional-body-json option should
not override join keys");
+ assertThat(exception.getMessage())
+ .contains(
+ "as join keys are expected to target different
enrichments on a request basis");
+ assertThat(exception.getMessage()).contains("key1");
+ }
+
+ @Test
+ public void testAdditionalJsonOverridesMultipleJoinKeys() {
+ // GIVEN - Additional JSON that tries to override multiple join keys
+ LookupRow lookupRow = getLookupRow(KEY_1, KEY_2);
+ Configuration config = getConfiguration("POST");
+ // Set body fields to include both keys
+ config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
List.of(KEY_1, KEY_2));
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"key1\":\"override1\",\"key2\":\"override2\",\"c\":789}");
+
+ // WHEN/THEN - Should throw IllegalArgumentException with all
conflicting fields
+ IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+ });
+
+ // Verify the error message contains both fields
+ assertThat(exception.getMessage())
+ .contains(
+ "The http.request.additional-body-json option should
not override join keys");
+ assertThat(exception.getMessage())
+ .contains(
+ "as join keys are expected to target different
enrichments on a request basis");
+ assertThat(exception.getMessage()).contains("Found conflicting
field(s):");
+ assertThat(exception.getMessage()).contains("key1");
+ assertThat(exception.getMessage()).contains("key2");
+ }
+
+ @Test
+ public void testAdditionalJsonOverridesMultipleJoinKeysDifferentOrder() {
+ // GIVEN - Additional JSON with fields in different order than body
fields
+ // Body fields: key1, key2
+ // Additional JSON: key2, key1 (reversed order)
+ LookupRow lookupRow = getLookupRow(KEY_1, KEY_2);
+ Configuration config = getConfiguration("POST");
+ // Set body fields: key1, key2
+ config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
List.of(KEY_1, KEY_2));
+ // Additional JSON has reversed order: key2, key1
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"key2\":\"override2\",\"key1\":\"override1\",\"c\":789}");
+
+ // WHEN/THEN - Should throw IllegalArgumentException with all
conflicting fields
+ IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+ });
+
+ // Verify the error message contains both fields regardless of order
+ assertThat(exception.getMessage())
+ .contains(
+ "The http.request.additional-body-json option should
not override join keys");
+ assertThat(exception.getMessage())
+ .contains(
+ "as join keys are expected to target different
enrichments on a request basis");
+ assertThat(exception.getMessage()).contains("Found conflicting
field(s):");
+ assertThat(exception.getMessage()).contains("key1");
+ assertThat(exception.getMessage()).contains("key2");
+ }
+
+ @Test
+ public void testAdditionalJsonWithNoBodyFields() throws Exception {
+ // GIVEN - Lookup key mapped to query param, not body field
+ // This allows additional JSON to be the entire request body
+ LookupRow lookupRow = getLookupRow(KEY_1);
+ Configuration config = new Configuration();
+ config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP,
urlParams);
+ config.set(LOOKUP_METHOD, "POST");
+ // Map lookup key to query param, not body field
+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS,
QUERY_PARAMS);
+ // Don't set REQUEST_BODY_FIELDS - it will be empty (no body fields)
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+
"{\"staticField\":\"staticValue\",\"count\":42,\"active\":true}");
+
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // WHEN
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+
+ // THEN - Additional JSON should be used as the entire body
+ String expectedJson =
"{\"staticField\":\"staticValue\",\"count\":42,\"active\":true}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode expected = mapper.readTree(expectedJson);
+ JsonNode actual = mapper.readTree(createdQuery.getLookupQuery());
+ assertThat(actual).isEqualTo(expected);
+ // Verify the lookup key is in query params, not body
+
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEqualTo(KEY_1 + "="
+ VALUE);
+ }
+
+ @Test
+ public void testAdditionalJsonOverridesBodyFieldFromUserScenario() {
+ // GIVEN - User scenario: body field 'customerId' with additional JSON
trying to override it
+ LookupRow lookupRow = new LookupRow();
+ lookupRow.addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry(
+ "customerId",
+
RowData.createFieldGetter(DataTypes.STRING().getLogicalType(), 0)));
+ lookupRow.setLookupPhysicalRowDataType(
+ row(List.of(DataTypes.FIELD("customerId",
DataTypes.STRING()))));
+
+ Configuration config = new Configuration();
+ config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP,
urlParams);
+ config.set(LOOKUP_METHOD, "POST");
+ config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
List.of("customerId"));
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"customerId\":\"bbb\"}");
+
+ // WHEN/THEN - Should throw IllegalArgumentException because
additional JSON tries to
+ // override body field
+ IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+ });
+
+ // Verify error message mentions the conflicting field
+ assertThat(exception.getMessage()).contains("customerId");
+ assertThat(exception.getMessage())
+ .contains("http.request.additional-body-json option should not
override join keys");
+ }
+
+ @Test
+ public void testAdditionalJsonCaseSensitiveJoinKeyCheck() {
+ // GIVEN - Additional JSON with different case than join key
+ LookupRow lookupRow = getLookupRow(KEY_1); // key1
+ Configuration config = getConfiguration("POST");
+ // KEY1 (uppercase) should not conflict with key1 (lowercase) - case
sensitive
+ config.set(
+
GenericJsonAndUrlQueryCreatorFactory.REQUEST_ADDITIONAL_BODY_JSON,
+ "{\"KEY1\":\"value\",\"c\":789}");
+
+ // WHEN - Should succeed because case is different
+ GenericJsonAndUrlQueryCreator creator =
+ (GenericJsonAndUrlQueryCreator)
+ new GenericJsonAndUrlQueryCreatorFactory()
+ .createLookupQueryCreator(
+ config,
+ lookupRow,
+ getTableContext(config,
RESOLVED_SCHEMA));
+
+ // THEN - Should work fine
+ var createdQuery = creator.createLookupQuery(ROWDATA);
+ String lookupQuery = createdQuery.getLookupQuery();
+ assertThat(lookupQuery).contains("key1");
+ assertThat(lookupQuery).contains("KEY1");
+ }
+
private static void validateCreatedQueryForGet(LookupQueryInfo
createdQuery) {
// check there is no body params and we have the expected lookup query
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty();
@@ -232,7 +669,7 @@ class GenericJsonAndUrlQueryCreatorTest {
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
QUERY_PARAMS);
}
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP,
urlParams);
- config.setString(LOOKUP_METHOD, operation);
+ config.set(LOOKUP_METHOD, operation);
return config;
}