ferenc-csaky commented on code in PR #17:
URL:
https://github.com/apache/flink-connector-http/pull/17#discussion_r2788019239
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java:
##########
@@ -152,6 +178,67 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_BODY_FIELDS,
REQUEST_URL_MAP);
+ return Set.of(
+ REQUEST_QUERY_PARAM_FIELDS,
+ REQUEST_BODY_FIELDS,
+ REQUEST_URL_MAP,
+ REQUEST_ADDITIONAL_BODY_JSON);
+ }
+
+ /**
+ * Creates and validates the additional JSON node from configuration. This
method parses the
+ * JSON once during factory creation to avoid re-parsing on every lookup
request, improving
+ * runtime performance.
+ *
+ * @param requestBodyFields the list of request body field names (join
keys)
+ * @param additionalRequestJson the additional JSON string to validate and
parse
+ * @return the parsed ObjectNode, or null if no additional JSON is provided
+ * @throws IllegalArgumentException if the JSON is invalid or contains
conflicting fields
+ */
+ private ObjectNode createAdditionalObjectNode(
Review Comment:
Maybe `getValidatedAdditionalObjectNode`? Since we emphasize validated, I
think it's better to reflect that in the method name, then we don't need that
comment when we call this method at L130.
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java:
##########
@@ -152,6 +178,67 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_BODY_FIELDS,
REQUEST_URL_MAP);
+ return Set.of(
+ REQUEST_QUERY_PARAM_FIELDS,
+ REQUEST_BODY_FIELDS,
+ REQUEST_URL_MAP,
+ REQUEST_ADDITIONAL_BODY_JSON);
+ }
+
+ /**
+ * Creates and validates the additional JSON node from configuration. This
method parses the
+ * JSON once during factory creation to avoid re-parsing on every lookup
request, improving
+ * runtime performance.
+ *
+ * @param requestBodyFields the list of request body field names (join
keys)
+ * @param additionalRequestJson the additional JSON string to validate and
parse
+ * @return the parsed ObjectNode, or null if no additional JSON is provided
+ * @throws IllegalArgumentException if the JSON is invalid or contains
conflicting fields
+ */
+ private ObjectNode createAdditionalObjectNode(
+ List<String> requestBodyFields, String additionalRequestJson) {
+ if (additionalRequestJson == null ||
additionalRequestJson.trim().isEmpty()) {
+ return null;
+ }
+
+ try {
+ // Parse the additional JSON once to avoid re-parsing on every
lookup
+ ObjectMapper mapper = ObjectMapperAdapter.instance();
+ JsonNode jsonNode = mapper.readTree(additionalRequestJson);
+
+ if (!jsonNode.isObject()) {
+ throw new IllegalArgumentException(
+ "The http.request.additional-body-json must be a valid
JSON object.");
+ }
+
+ // Get join key names from request body fields (case-sensitive)
+ Set<String> joinKeyNames = new HashSet<>(requestBodyFields);
+
+ // Collect all conflicting fields
+ Set<String> conflictingFields = new HashSet<>();
+ Iterator<String> fieldNames = jsonNode.fieldNames();
+ while (fieldNames.hasNext()) {
+ String fieldName = fieldNames.next();
+ if (joinKeyNames.contains(fieldName)) {
+ conflictingFields.add(fieldName);
+ }
+ }
Review Comment:
This can be simplified:
```java
Set<String> conflictingFields = new HashSet<>();
jsonNode.fieldNames().forEachRemaining(conflictingFields::add);
conflictingFields.retainAll(joinKeyNames);
```
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java:
##########
@@ -152,6 +178,67 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_BODY_FIELDS,
REQUEST_URL_MAP);
+ return Set.of(
+ REQUEST_QUERY_PARAM_FIELDS,
+ REQUEST_BODY_FIELDS,
+ REQUEST_URL_MAP,
+ REQUEST_ADDITIONAL_BODY_JSON);
+ }
+
+ /**
+ * Creates and validates the additional JSON node from configuration. This
method parses the
+ * JSON once during factory creation to avoid re-parsing on every lookup
request, improving
+ * runtime performance.
+ *
+ * @param requestBodyFields the list of request body field names (join
keys)
+ * @param additionalRequestJson the additional JSON string to validate and
parse
+ * @return the parsed ObjectNode, or null if no additional JSON is provided
+ * @throws IllegalArgumentException if the JSON is invalid or contains
conflicting fields
+ */
+ private ObjectNode createAdditionalObjectNode(
+ List<String> requestBodyFields, String additionalRequestJson) {
+ if (additionalRequestJson == null ||
additionalRequestJson.trim().isEmpty()) {
+ return null;
+ }
+
+ try {
+ // Parse the additional JSON once to avoid re-parsing on every
lookup
+ ObjectMapper mapper = ObjectMapperAdapter.instance();
+ JsonNode jsonNode = mapper.readTree(additionalRequestJson);
+
+ if (!jsonNode.isObject()) {
+ throw new IllegalArgumentException(
+ "The http.request.additional-body-json must be a valid
JSON object.");
+ }
+
+ // Get join key names from request body fields (case-sensitive)
+ Set<String> joinKeyNames = new HashSet<>(requestBodyFields);
+
+ // Collect all conflicting fields
+ Set<String> conflictingFields = new HashSet<>();
+ Iterator<String> fieldNames = jsonNode.fieldNames();
+ while (fieldNames.hasNext()) {
+ String fieldName = fieldNames.next();
+ if (joinKeyNames.contains(fieldName)) {
+ conflictingFields.add(fieldName);
+ }
+ }
+
+ // If there are conflicts, throw exception with all conflicting
fields
+ if (!conflictingFields.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The http.request.additional-body-json option
should not override join keys, "
+ + "as join keys are expected to target
different enrichments on a request basis. "
+ + "Found conflicting field%s: %s",
+ conflictingFields.size() > 1 ? "s" : "",
Review Comment:
I'd just spare this ternary if that makes this pretty hard to read and do
`field(s): %s`
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java:
##########
@@ -152,6 +178,67 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
- return Set.of(REQUEST_QUERY_PARAM_FIELDS, REQUEST_BODY_FIELDS,
REQUEST_URL_MAP);
+ return Set.of(
+ REQUEST_QUERY_PARAM_FIELDS,
+ REQUEST_BODY_FIELDS,
+ REQUEST_URL_MAP,
+ REQUEST_ADDITIONAL_BODY_JSON);
+ }
+
+ /**
+ * Creates and validates the additional JSON node from configuration. This
method parses the
+ * JSON once during factory creation to avoid re-parsing on every lookup
request, improving
+ * runtime performance.
+ *
+ * @param requestBodyFields the list of request body field names (join
keys)
+ * @param additionalRequestJson the additional JSON string to validate and
parse
+ * @return the parsed ObjectNode, or null if no additional JSON is provided
+ * @throws IllegalArgumentException if the JSON is invalid or contains
conflicting fields
+ */
+ private ObjectNode createAdditionalObjectNode(
+ List<String> requestBodyFields, String additionalRequestJson) {
+ if (additionalRequestJson == null ||
additionalRequestJson.trim().isEmpty()) {
+ return null;
+ }
+
+ try {
+ // Parse the additional JSON once to avoid re-parsing on every
lookup
+ ObjectMapper mapper = ObjectMapperAdapter.instance();
+ JsonNode jsonNode = mapper.readTree(additionalRequestJson);
+
+ if (!jsonNode.isObject()) {
+ throw new IllegalArgumentException(
+ "The http.request.additional-body-json must be a valid
JSON object.");
Review Comment:
I'd rather refer the key as `REQUEST_ADDITIONAL_BODY_JSON.key()` cause if 1
year from now that key changes for some reason hardcoding this may won't be
detected. Same for the rest of the error messages
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]