This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fa79571  NIFI-9066 Supporting flow file attributes in PutSplunkHTTP; 
Fixing endpoint assembly with extra query values
fa79571 is described below

commit fa79571e309b117d86fced82d7a962ffaad5e06a
Author: Bence Simon <[email protected]>
AuthorDate: Tue Aug 24 13:13:02 2021 +0200

    NIFI-9066 Supporting flow file attributes in PutSplunkHTTP; Fixing endpoint 
assembly with extra query values
    
    This closes #5330.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../nifi/processors/splunk/PutSplunkHTTP.java      | 126 +++++++++------------
 .../nifi/processors/splunk/SplunkAPICall.java      |   2 +-
 .../nifi/processors/splunk/TestPutSplunkHTTP.java  |  77 +++++++++++--
 3 files changed, 126 insertions(+), 79 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
index 7a19e6f..861de32 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
@@ -28,8 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dto.splunk.SendRawDataResponse;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -50,11 +48,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"splunk", "logs", "http"})
@@ -74,7 +71,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
             .description("User-defined event source. Sets a default for all 
events when unspecified.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     static final PropertyDescriptor SOURCE_TYPE = new 
PropertyDescriptor.Builder()
@@ -83,7 +80,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
             .description("User-defined event sourcetype. Sets a default for 
all events when unspecified.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
@@ -92,7 +89,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
             .description("Specify with the host query string parameter. Sets a 
default for all events when unspecified.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
@@ -101,7 +98,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
             .description("Index name. Specify with the index query string 
parameter. Sets a default for all events when unspecified.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@@ -111,7 +108,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
             .required(true)
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .defaultValue(Charset.defaultCharset().name())
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     static final PropertyDescriptor CONTENT_TYPE = new 
PropertyDescriptor.Builder()
@@ -123,7 +120,7 @@ public class PutSplunkHTTP extends SplunkAPICall {
                     "In case of neither of them is specified, this information 
will not be sent to the server.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
@@ -157,61 +154,6 @@ public class PutSplunkHTTP extends SplunkAPICall {
         return result;
     }
 
-    private volatile String endpoint;
-    private volatile String contentType;
-    private volatile String charset;
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        super.onScheduled(context);
-
-        if (context.getProperty(CONTENT_TYPE).isSet()) {
-            contentType = 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
-        }
-
-        charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
-
-        final Map<String, String> queryParameters = new HashMap<>();
-
-        if (context.getProperty(SOURCE_TYPE).isSet()) {
-            queryParameters.put("sourcetype", 
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
-        }
-
-        if (context.getProperty(SOURCE).isSet()) {
-            queryParameters.put("source", 
context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
-        }
-
-        if (context.getProperty(HOST).isSet()) {
-            queryParameters.put("host", 
context.getProperty(HOST).evaluateAttributeExpressions().getValue());
-        }
-
-        if (context.getProperty(INDEX).isSet()) {
-            queryParameters.put("index", 
context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
-        }
-
-        endpoint = getEndpoint(queryParameters);
-    }
-
-    private String getEndpoint(final Map<String, String> queryParameters) {
-        if (queryParameters.isEmpty()) {
-            return ENDPOINT;
-        }
-
-        try {
-            return URLEncoder.encode(ENDPOINT + '?' + 
queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + 
e.getValue()).collect(Collectors.joining("&")), "UTF-8");
-        } catch (final UnsupportedEncodingException e) {
-            getLogger().error("Could not be initialized because of: {}", new 
Object[] {e.getMessage()}, e);
-            throw new ProcessException(e);
-        }
-    }
-
-    @OnStopped
-    public void onStopped() {
-        super.onStopped();
-        contentType = null;
-        endpoint = null;
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         ResponseMessage responseMessage = null;
@@ -223,7 +165,8 @@ public class PutSplunkHTTP extends SplunkAPICall {
         }
 
         try {
-            final RequestMessage requestMessage = 
createRequestMessage(session, flowFile);
+            final String endpoint = getEndpoint(context, flowFile);
+            final RequestMessage requestMessage = 
createRequestMessage(session, flowFile, context);
             responseMessage = call(endpoint, requestMessage);
             flowFile = session.putAttribute(flowFile, "splunk.status.code", 
String.valueOf(responseMessage.getStatus()));
 
@@ -262,16 +205,19 @@ public class PutSplunkHTTP extends SplunkAPICall {
         }
     }
 
-    private RequestMessage createRequestMessage(final ProcessSession session, 
final FlowFile flowFile) {
+    protected RequestMessage createRequestMessage(final ProcessSession 
session, final FlowFile flowFile, final ProcessContext context) {
         final RequestMessage requestMessage = new RequestMessage("POST");
-        final String flowFileContentType = 
Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type"));
+        final String contentType = (context.getProperty(CONTENT_TYPE).isSet())
+                ? 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue()
+                : flowFile.getAttribute("mime.type");
 
-        if (flowFileContentType != null) {
-            requestMessage.getHeader().put("Content-Type", 
flowFileContentType);
+        if (contentType != null) {
+            requestMessage.getHeader().put("Content-Type", contentType);
         }
 
         // The current version of Splunk's {@link com.splunk.Service} class is 
lack of support for OutputStream as content.
         // For further details please visit {@link 
com.splunk.HttpService#send} which is called internally.
+        final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
         requestMessage.setContent(extractTextMessageBody(flowFile, session, 
charset));
         return requestMessage;
     }
@@ -288,4 +234,44 @@ public class PutSplunkHTTP extends SplunkAPICall {
         attributes.put(SplunkAPICall.RESPONDED_AT_ATTRIBUTE, 
String.valueOf(System.currentTimeMillis()));
         return session.putAllAttributes(flowFile, attributes);
     }
+
+    public String getEndpoint(final ProcessContext context, final FlowFile 
flowFile) {
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", 
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", 
context.getProperty(SOURCE).evaluateAttributeExpressions(flowFile).getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", 
context.getProperty(HOST).evaluateAttributeExpressions(flowFile).getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", 
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue());
+        }
+
+        final StringBuilder result = new StringBuilder(ENDPOINT);
+
+        if (!queryParameters.isEmpty()) {
+            final List<String> parameters = new LinkedList<>();
+
+            try {
+                for (final Map.Entry<String, String> parameter : 
queryParameters.entrySet()) {
+                    parameters.add(URLEncoder.encode(parameter.getKey(), 
"UTF-8") + '=' + URLEncoder.encode(parameter.getValue(), "UTF-8"));
+                }
+            } catch (final UnsupportedEncodingException e) {
+                getLogger().error("Could not be initialized because of: {}", 
new Object[]{e.getMessage()}, e);
+                throw new ProcessException(e);
+            }
+
+            result.append('?');
+            result.append(String.join("&", parameters));
+        }
+
+        return result.toString();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
index 8cf4f8a..15fab48 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
@@ -102,7 +102,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
     static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
             .name("Token")
             .displayName("HTTP Event Collector Token")
-            .description("HTTP Event Collector token starting with the string 
Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd")
+            .description("HTTP Event Collector token starting with the string 
Splunk. For example \'Splunk 1234578-abcd-1234-abcd-1234abcd\'")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
index 8e30424..f526ad8 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java
@@ -20,6 +20,9 @@ import com.splunk.RequestMessage;
 import com.splunk.ResponseMessage;
 import com.splunk.Service;
 import com.splunk.ServiceArgs;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -29,6 +32,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -36,12 +40,15 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestPutSplunkHTTP {
     private static final String ACK_ID = "1234";
-    private static final String EVENT = 
"{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}";
+    private static final String EVENT = 
"{\"a\"=\"á\",\"c\"=\"ő\",\"e\"=\"'ű'\"}"; // Intentionally uses UTF-8 character
     private static final String SUCCESS_RESPONSE =
             "{\n" +
             "    \"text\": \"Success\",\n" +
@@ -59,12 +66,15 @@ public class TestPutSplunkHTTP {
     @Mock
     private ResponseMessage response;
 
-    private MockedPutSplunkHTTP processor;
-    private TestRunner testRunner;
-
+    @Captor
     private ArgumentCaptor<String> path;
+
+    @Captor
     private ArgumentCaptor<RequestMessage> request;
 
+    private MockedPutSplunkHTTP processor;
+    private TestRunner testRunner;
+
     @Before
     public void setUp() {
         processor = new MockedPutSplunkHTTP(service);
@@ -73,8 +83,6 @@ public class TestPutSplunkHTTP {
         testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 
888c5a81-8777-49a0-a3af-f76e050ab5d9");
         testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, 
"22bd7414-0d77-4c73-936d-c8f5d1b21862");
 
-        path = ArgumentCaptor.forClass(String.class);
-        request = ArgumentCaptor.forClass(RequestMessage.class);
         Mockito.when(service.send(path.capture(), 
request.capture())).thenReturn(response);
     }
 
@@ -107,7 +115,7 @@ public class TestPutSplunkHTTP {
     public void testHappyPathWithCustomQueryParameters() throws Exception {
         // given
         testRunner.setProperty(PutSplunkHTTP.SOURCE, "test_source");
-        testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "test_source_type");
+        testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "test?source?type");
         givenSplunkReturnsWithSuccess();
 
         // when
@@ -116,7 +124,42 @@ public class TestPutSplunkHTTP {
 
         // then
         
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
-        
Assert.assertEquals("%2Fservices%2Fcollector%2Fraw%3Fsourcetype%3Dtest_source_type%26source%3Dtest_source",
 path.getValue());
+        
Assert.assertEquals("/services/collector/raw?sourcetype=test%3Fsource%3Ftype&source=test_source",
 path.getValue());
+    }
+
+    @Test
+    public void testHappyPathWithCustomQueryParametersFromFlowFile() throws 
Exception {
+        // given
+        testRunner.setProperty(PutSplunkHTTP.SOURCE, "${ff_source}");
+        testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "${ff_source_type}");
+        testRunner.setProperty(PutSplunkHTTP.HOST, "${ff_host}");
+        testRunner.setProperty(PutSplunkHTTP.INDEX, "${ff_index}");
+        testRunner.setProperty(PutSplunkHTTP.CHARSET, "${ff_charset}");
+        testRunner.setProperty(PutSplunkHTTP.CONTENT_TYPE, 
"${ff_content_type}");
+        givenSplunkReturnsWithSuccess();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ff_source", "test_source");
+        attributes.put("ff_source_type", "test?source?type");
+        attributes.put("ff_host", "test_host");
+        attributes.put("ff_index", "test_index");
+        attributes.put("ff_charset", "UTF-8");
+        attributes.put("ff_content_type", "test_content_type");
+
+        final MockFlowFile incomingFlowFile = new MockFlowFile(1);
+        incomingFlowFile.putAttributes(attributes);
+        incomingFlowFile.setData(EVENT.getBytes(StandardCharsets.UTF_8));
+
+        // when
+        testRunner.enqueue(incomingFlowFile);
+        testRunner.run();
+
+        // then
+        
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
+        
Assert.assertEquals("/services/collector/raw?host=test_host&index=test_index&sourcetype=test%3Fsource%3Ftype&source=test_source",
 path.getValue());
+
+        Assert.assertEquals(EVENT, processor.getLastContent());
+        Assert.assertEquals(attributes.get("ff_content_type"), 
processor.getLastContentType());
     }
 
     @Test
@@ -202,6 +245,8 @@ public class TestPutSplunkHTTP {
 
     public static class MockedPutSplunkHTTP extends PutSplunkHTTP {
         final Service serviceMock;
+        Object lastContent = null;
+        String lastContentType = null;
 
         public MockedPutSplunkHTTP(final Service serviceMock) {
             this.serviceMock = serviceMock;
@@ -211,5 +256,21 @@ public class TestPutSplunkHTTP {
         protected Service getSplunkService(final ServiceArgs 
splunkServiceArguments) {
             return serviceMock;
         }
+
+        @Override
+        protected RequestMessage createRequestMessage(final ProcessSession 
session, final FlowFile flowFile, final ProcessContext context) {
+            final RequestMessage requestMessage = 
super.createRequestMessage(session, flowFile, context);
+            lastContent = requestMessage.getContent();
+            lastContentType = requestMessage.getHeader().get("Content-Type");
+            return requestMessage;
+        }
+
+        public Object getLastContent() {
+            return lastContent;
+        }
+
+        public String getLastContentType() {
+            return lastContentType;
+        }
     }
 }

Reply via email to