tpalfy commented on a change in pull request #5330:
URL: https://github.com/apache/nifi/pull/5330#discussion_r697463826
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -223,8 +166,9 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
try {
- final RequestMessage requestMessage =
createRequestMessage(session, flowFile);
- responseMessage = call(endpoint, requestMessage);
+ final RequestDetails requestDetails =
RequestDetails.getInstance(getLogger(), context, flowFile);
+ final RequestMessage requestMessage =
createRequestMessage(session, flowFile, requestDetails);
+ responseMessage = call(requestDetails.getEndpoint(),
requestMessage);
Review comment:
We could save the addition of the `RequestDetails` class if here we
simply created the `endpoint` only and leave the handling of `contentType` and
`charset` to the `createRequestMessage`.
```suggestion
RequestMessage requestMessage = createRequestMessage(context,
session, flowFile);
responseMessage = call(buildEndpoint(context, flowFile),
requestMessage);
```
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -288,4 +232,76 @@ private FlowFile enrichFlowFile(final ProcessSession
session, final FlowFile flo
attributes.put(SplunkAPICall.RESPONDED_AT_ATTRIBUTE,
String.valueOf(System.currentTimeMillis()));
return session.putAllAttributes(flowFile, attributes);
}
+
+ private static class RequestDetails {
+ private static final String ENDPOINT = "/services/collector/raw";
+
+ private final String endpoint;
+ private final String contentType;
+ private final String charset;
+
+ private RequestDetails(final String endpoint, final String
contentType, final String charset) {
+ this.endpoint = endpoint;
+ this.contentType = contentType;
+ this.charset = charset;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+
+ public static RequestDetails getInstance(final ComponentLog logger,
final ProcessContext context, final FlowFile flowFile) {
+ final String contentType =
(context.getProperty(CONTENT_TYPE).isSet()) ?
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue()
: null;
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+ final Map<String, String> queryParameters = new HashMap<>();
+
+ if (context.getProperty(SOURCE_TYPE).isSet()) {
+ queryParameters.put("sourcetype",
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ if (context.getProperty(SOURCE).isSet()) {
+ queryParameters.put("source",
context.getProperty(SOURCE).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ if (context.getProperty(HOST).isSet()) {
+ queryParameters.put("host",
context.getProperty(HOST).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ if (context.getProperty(INDEX).isSet()) {
+ queryParameters.put("index",
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ return new RequestDetails(getEndpoint(logger, queryParameters),
contentType, charset);
+ }
+
+ private static String getEndpoint(final ComponentLog logger, final
Map<String, String> queryParameters) {
+ final StringBuilder result = new StringBuilder(ENDPOINT);
+
+ if (!queryParameters.isEmpty()) {
+ final List<String> parameters = new LinkedList<>();
+
+ try {
+ for (final Map.Entry<String, String> parameter :
queryParameters.entrySet()) {
+ parameters.add(URLEncoder.encode(parameter.getKey(),
"UTF-8") + '=' + URLEncoder.encode(parameter.getValue(), "UTF-8"));
+ }
+ } catch (final UnsupportedEncodingException e) {
+ logger.error("Could not be initialized because of: {}",
new Object[]{e.getMessage()}, e);
+ throw new ProcessException(e);
+ }
+
+ result.append('?');
+ result.append(String.join("&", parameters));
+ }
+
+ return result.toString();
+ }
+ }
Review comment:
```suggestion
private String buildEndpoint(ProcessContext context, FlowFile flowFile) {
final Map<String, String> queryParameters = new HashMap<>();
if (context.getProperty(SOURCE_TYPE).isSet()) {
queryParameters.put("sourcetype",
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue());
}
if (context.getProperty(SOURCE).isSet()) {
queryParameters.put("source",
context.getProperty(SOURCE).evaluateAttributeExpressions(flowFile).getValue());
}
if (context.getProperty(HOST).isSet()) {
queryParameters.put("host",
context.getProperty(HOST).evaluateAttributeExpressions(flowFile).getValue());
}
if (context.getProperty(INDEX).isSet()) {
queryParameters.put("index",
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue());
}
final StringBuilder result = new StringBuilder(ENDPOINT);
if (!queryParameters.isEmpty()) {
final List<String> parameters = new LinkedList<>();
try {
for (final Map.Entry<String, String> parameter :
queryParameters.entrySet()) {
parameters.add(URLEncoder.encode(parameter.getKey(),
"UTF-8") + '=' + URLEncoder.encode(parameter.getValue(), "UTF-8"));
}
} catch (final UnsupportedEncodingException e) {
getLogger().error("Could not be initialized because of: {}",
e.getMessage(), e);
throw new ProcessException(e);
}
result.append('?');
result.append(String.join("&", parameters));
}
return result.toString();
}
private RequestMessage createRequestMessage(ProcessContext context,
final ProcessSession session, final FlowFile flowFile) {
final RequestMessage requestMessage = new RequestMessage("POST");
final String flowFileContentType =
(context.getProperty(CONTENT_TYPE).isSet()) ?
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue()
:
flowFile.getAttribute("mime.type");
if (flowFileContentType != null) {
requestMessage.getHeader().put("Content-Type",
flowFileContentType);
}
final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
// The current version of Splunk's {@link com.splunk.Service} class
is lack of support for OutputStream as content.
// For further details please visit {@link
com.splunk.HttpService#send} which is called internally.
requestMessage.setContent(extractTextMessageBody(flowFile, session,
charset));
return requestMessage;
}
```
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
##########
@@ -116,7 +118,30 @@ public void testHappyPathWithCustomQueryParameters()
throws Exception {
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
-
Assert.assertEquals("%2Fservices%2Fcollector%2Fraw%3Fsourcetype%3Dtest_source_type%26source%3Dtest_source",
path.getValue());
+
Assert.assertEquals("/services/collector/raw?sourcetype=test%3Fsource%3Ftype&source=test_source",
path.getValue());
+ }
+
+ @Test
+ public void testHappyPathWithCustomQueryParametersFromFlowFile() throws
Exception {
Review comment:
We could benefit from a test that covers all properties not just
`sourcetype` and `source`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]