This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fa79571 NIFI-9066 Supporting flow file attributes in PutSplunkHTTP;
Fixing endpoint assembly with extra query values
fa79571 is described below
commit fa79571e309b117d86fced82d7a962ffaad5e06a
Author: Bence Simon <[email protected]>
AuthorDate: Tue Aug 24 13:13:02 2021 +0200
NIFI-9066 Supporting flow file attributes in PutSplunkHTTP; Fixing endpoint
assembly with extra query values
This closes #5330.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../nifi/processors/splunk/PutSplunkHTTP.java | 126 +++++++++------------
.../nifi/processors/splunk/SplunkAPICall.java | 2 +-
.../nifi/processors/splunk/TestPutSplunkHTTP.java | 77 +++++++++++--
3 files changed, 126 insertions(+), 79 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
index 7a19e6f..861de32 100644
---
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
+++
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
@@ -28,8 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dto.splunk.SendRawDataResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -50,11 +48,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"splunk", "logs", "http"})
@@ -74,7 +71,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
.description("User-defined event source. Sets a default for all
events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor SOURCE_TYPE = new
PropertyDescriptor.Builder()
@@ -83,7 +80,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
.description("User-defined event sourcetype. Sets a default for
all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
@@ -92,7 +89,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
.description("Specify with the host query string parameter. Sets a
default for all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
@@ -101,7 +98,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
.description("Index name. Specify with the index query string
parameter. Sets a default for all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@@ -111,7 +108,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(Charset.defaultCharset().name())
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor CONTENT_TYPE = new
PropertyDescriptor.Builder()
@@ -123,7 +120,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
"In case of neither of them is specified, this information
will not be sent to the server.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
@@ -157,61 +154,6 @@ public class PutSplunkHTTP extends SplunkAPICall {
return result;
}
- private volatile String endpoint;
- private volatile String contentType;
- private volatile String charset;
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- super.onScheduled(context);
-
- if (context.getProperty(CONTENT_TYPE).isSet()) {
- contentType =
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
- }
-
- charset =
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
-
- final Map<String, String> queryParameters = new HashMap<>();
-
- if (context.getProperty(SOURCE_TYPE).isSet()) {
- queryParameters.put("sourcetype",
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
- }
-
- if (context.getProperty(SOURCE).isSet()) {
- queryParameters.put("source",
context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
- }
-
- if (context.getProperty(HOST).isSet()) {
- queryParameters.put("host",
context.getProperty(HOST).evaluateAttributeExpressions().getValue());
- }
-
- if (context.getProperty(INDEX).isSet()) {
- queryParameters.put("index",
context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
- }
-
- endpoint = getEndpoint(queryParameters);
- }
-
- private String getEndpoint(final Map<String, String> queryParameters) {
- if (queryParameters.isEmpty()) {
- return ENDPOINT;
- }
-
- try {
- return URLEncoder.encode(ENDPOINT + '?' +
queryParameters.entrySet().stream().map(e -> e.getKey() + '=' +
e.getValue()).collect(Collectors.joining("&")), "UTF-8");
- } catch (final UnsupportedEncodingException e) {
- getLogger().error("Could not be initialized because of: {}", new
Object[] {e.getMessage()}, e);
- throw new ProcessException(e);
- }
- }
-
- @OnStopped
- public void onStopped() {
- super.onStopped();
- contentType = null;
- endpoint = null;
- }
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
ResponseMessage responseMessage = null;
@@ -223,7 +165,8 @@ public class PutSplunkHTTP extends SplunkAPICall {
}
try {
- final RequestMessage requestMessage =
createRequestMessage(session, flowFile);
+ final String endpoint = getEndpoint(context, flowFile);
+ final RequestMessage requestMessage =
createRequestMessage(session, flowFile, context);
responseMessage = call(endpoint, requestMessage);
flowFile = session.putAttribute(flowFile, "splunk.status.code",
String.valueOf(responseMessage.getStatus()));
@@ -262,16 +205,19 @@ public class PutSplunkHTTP extends SplunkAPICall {
}
}
- private RequestMessage createRequestMessage(final ProcessSession session,
final FlowFile flowFile) {
+ protected RequestMessage createRequestMessage(final ProcessSession
session, final FlowFile flowFile, final ProcessContext context) {
final RequestMessage requestMessage = new RequestMessage("POST");
- final String flowFileContentType =
Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type"));
+ final String contentType = (context.getProperty(CONTENT_TYPE).isSet())
+ ?
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue()
+ : flowFile.getAttribute("mime.type");
- if (flowFileContentType != null) {
- requestMessage.getHeader().put("Content-Type",
flowFileContentType);
+ if (contentType != null) {
+ requestMessage.getHeader().put("Content-Type", contentType);
}
// The current version of Splunk's {@link com.splunk.Service} class is
lack of support for OutputStream as content.
// For further details please visit {@link
com.splunk.HttpService#send} which is called internally.
+ final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
requestMessage.setContent(extractTextMessageBody(flowFile, session,
charset));
return requestMessage;
}
@@ -288,4 +234,44 @@ public class PutSplunkHTTP extends SplunkAPICall {
attributes.put(SplunkAPICall.RESPONDED_AT_ATTRIBUTE,
String.valueOf(System.currentTimeMillis()));
return session.putAllAttributes(flowFile, attributes);
}
+
+ public String getEndpoint(final ProcessContext context, final FlowFile
flowFile) {
+ final Map<String, String> queryParameters = new HashMap<>();
+
+ if (context.getProperty(SOURCE_TYPE).isSet()) {
+ queryParameters.put("sourcetype",
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ if (context.getProperty(SOURCE).isSet()) {
+ queryParameters.put("source",
context.getProperty(SOURCE).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ if (context.getProperty(HOST).isSet()) {
+ queryParameters.put("host",
context.getProperty(HOST).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ if (context.getProperty(INDEX).isSet()) {
+ queryParameters.put("index",
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue());
+ }
+
+ final StringBuilder result = new StringBuilder(ENDPOINT);
+
+ if (!queryParameters.isEmpty()) {
+ final List<String> parameters = new LinkedList<>();
+
+ try {
+ for (final Map.Entry<String, String> parameter :
queryParameters.entrySet()) {
+ parameters.add(URLEncoder.encode(parameter.getKey(),
"UTF-8") + '=' + URLEncoder.encode(parameter.getValue(), "UTF-8"));
+ }
+ } catch (final UnsupportedEncodingException e) {
+ getLogger().error("Could not be initialized because of: {}",
new Object[]{e.getMessage()}, e);
+ throw new ProcessException(e);
+ }
+
+ result.append('?');
+ result.append(String.join("&", parameters));
+ }
+
+ return result.toString();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
index 8cf4f8a..15fab48 100644
---
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
+++
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
@@ -102,7 +102,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
.name("Token")
.displayName("HTTP Event Collector Token")
- .description("HTTP Event Collector token starting with the string
Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd")
+ .description("HTTP Event Collector token starting with the string
Splunk. For example \'Splunk 1234578-abcd-1234-abcd-1234abcd\'")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
diff --git
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
index 8e30424..f526ad8 100644
---
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
+++
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
@@ -20,6 +20,9 @@ import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import com.splunk.Service;
import com.splunk.ServiceArgs;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -29,6 +32,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@@ -36,12 +40,15 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
public class TestPutSplunkHTTP {
private static final String ACK_ID = "1234";
- private static final String EVENT =
"{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}";
+ private static final String EVENT =
"{\"a\"=\"á\",\"c\"=\"ő\",\"e\"=\"'ű'\"}"; // Intentionally uses UTF-8 character
private static final String SUCCESS_RESPONSE =
"{\n" +
" \"text\": \"Success\",\n" +
@@ -59,12 +66,15 @@ public class TestPutSplunkHTTP {
@Mock
private ResponseMessage response;
- private MockedPutSplunkHTTP processor;
- private TestRunner testRunner;
-
+ @Captor
private ArgumentCaptor<String> path;
+
+ @Captor
private ArgumentCaptor<RequestMessage> request;
+ private MockedPutSplunkHTTP processor;
+ private TestRunner testRunner;
+
@Before
public void setUp() {
processor = new MockedPutSplunkHTTP(service);
@@ -73,8 +83,6 @@ public class TestPutSplunkHTTP {
testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk
888c5a81-8777-49a0-a3af-f76e050ab5d9");
testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL,
"22bd7414-0d77-4c73-936d-c8f5d1b21862");
- path = ArgumentCaptor.forClass(String.class);
- request = ArgumentCaptor.forClass(RequestMessage.class);
Mockito.when(service.send(path.capture(),
request.capture())).thenReturn(response);
}
@@ -107,7 +115,7 @@ public class TestPutSplunkHTTP {
public void testHappyPathWithCustomQueryParameters() throws Exception {
// given
testRunner.setProperty(PutSplunkHTTP.SOURCE, "test_source");
- testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "test_source_type");
+ testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "test?source?type");
givenSplunkReturnsWithSuccess();
// when
@@ -116,7 +124,42 @@ public class TestPutSplunkHTTP {
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
-
Assert.assertEquals("%2Fservices%2Fcollector%2Fraw%3Fsourcetype%3Dtest_source_type%26source%3Dtest_source",
path.getValue());
+
Assert.assertEquals("/services/collector/raw?sourcetype=test%3Fsource%3Ftype&source=test_source",
path.getValue());
+ }
+
+ @Test
+ public void testHappyPathWithCustomQueryParametersFromFlowFile() throws
Exception {
+ // given
+ testRunner.setProperty(PutSplunkHTTP.SOURCE, "${ff_source}");
+ testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "${ff_source_type}");
+ testRunner.setProperty(PutSplunkHTTP.HOST, "${ff_host}");
+ testRunner.setProperty(PutSplunkHTTP.INDEX, "${ff_index}");
+ testRunner.setProperty(PutSplunkHTTP.CHARSET, "${ff_charset}");
+ testRunner.setProperty(PutSplunkHTTP.CONTENT_TYPE,
"${ff_content_type}");
+ givenSplunkReturnsWithSuccess();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("ff_source", "test_source");
+ attributes.put("ff_source_type", "test?source?type");
+ attributes.put("ff_host", "test_host");
+ attributes.put("ff_index", "test_index");
+ attributes.put("ff_charset", "UTF-8");
+ attributes.put("ff_content_type", "test_content_type");
+
+ final MockFlowFile incomingFlowFile = new MockFlowFile(1);
+ incomingFlowFile.putAttributes(attributes);
+ incomingFlowFile.setData(EVENT.getBytes(StandardCharsets.UTF_8));
+
+ // when
+ testRunner.enqueue(incomingFlowFile);
+ testRunner.run();
+
+ // then
+
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
+
Assert.assertEquals("/services/collector/raw?host=test_host&index=test_index&sourcetype=test%3Fsource%3Ftype&source=test_source",
path.getValue());
+
+ Assert.assertEquals(EVENT, processor.getLastContent());
+ Assert.assertEquals(attributes.get("ff_content_type"),
processor.getLastContentType());
}
@Test
@@ -202,6 +245,8 @@ public class TestPutSplunkHTTP {
public static class MockedPutSplunkHTTP extends PutSplunkHTTP {
final Service serviceMock;
+ Object lastContent = null;
+ String lastContentType = null;
public MockedPutSplunkHTTP(final Service serviceMock) {
this.serviceMock = serviceMock;
@@ -211,5 +256,21 @@ public class TestPutSplunkHTTP {
protected Service getSplunkService(final ServiceArgs
splunkServiceArguments) {
return serviceMock;
}
+
+ @Override
+ protected RequestMessage createRequestMessage(final ProcessSession
session, final FlowFile flowFile, final ProcessContext context) {
+ final RequestMessage requestMessage =
super.createRequestMessage(session, flowFile, context);
+ lastContent = requestMessage.getContent();
+ lastContentType = requestMessage.getHeader().get("Content-Type");
+ return requestMessage;
+ }
+
+ public Object getLastContent() {
+ return lastContent;
+ }
+
+ public String getLastContentType() {
+ return lastContentType;
+ }
}
}