This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bfd964b  NIFI-8176 - Move the timeout check after we process the 
response from Splunk to make sure we poll for acknowledgement at least once. 
(No need for flag.)
bfd964b is described below

commit bfd964b9c7fdac7190780dc03a5fe02ce754b66b
Author: Timea Barna <[email protected]>
AuthorDate: Tue Feb 16 08:40:35 2021 +0100

    NIFI-8176 - Move the timeout check after we process the response from 
Splunk to make sure we poll for acknowledgement at least once. (No need for 
flag.)
    
    This closes #4824.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../processors/splunk/QuerySplunkIndexingStatus.java   | 18 ++++++++++--------
 .../apache/nifi/processors/splunk/SplunkAPICall.java   | 14 +++++++++++---
 .../additionalDetails.html                             |  5 +++--
 .../splunk/TestQuerySplunkIndexingStatus.java          | 12 ------------
 4 files changed, 24 insertions(+), 25 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
index 9ed5210..747e5e4 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
@@ -167,8 +167,6 @@ public class QuerySplunkIndexingStatus extends 
SplunkAPICall {
                 getLogger().error("Flow file ({}) attributes {} and {} are 
expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), 
SplunkAPICall.RESPONDED_AT_ATTRIBUTE, 
SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
-                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
             } else {
                 undetermined.put(ackId.get(), flowFile);
             }
@@ -193,14 +191,18 @@ public class QuerySplunkIndexingStatus extends 
SplunkAPICall {
             if (responseMessage.getStatus() == 200) {
                 final EventIndexStatusResponse splunkResponse = 
unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
-                splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = 
undetermined.get(result.getKey());
-
-                    if (result.getValue()) {
+                splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) 
-> {
+                    final FlowFile toTransfer = undetermined.get(flowFileId);
+                    if (isAcknowledged) {
                         session.transfer(toTransfer, 
RELATIONSHIP_ACKNOWLEDGED);
                     } else {
-                        session.penalize(toTransfer);
-                        session.transfer(toTransfer, 
RELATIONSHIP_UNDETERMINED);
+                        final Long sentAt = 
extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get();
+                        if (sentAt + ttl < currentTime) {
+                            session.transfer(toTransfer, 
RELATIONSHIP_UNACKNOWLEDGED);
+                        } else {
+                            session.penalize(toTransfer);
+                            session.transfer(toTransfer, 
RELATIONSHIP_UNDETERMINED);
+                        }
                     }
                 });
             } else {
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
index 46b6de8..8cf4f8a 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
@@ -54,6 +54,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
             .name("Scheme")
+            .displayName("Scheme")
             .description("The scheme for connecting to Splunk.")
             .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
             .defaultValue(HTTPS_SCHEME)
@@ -62,6 +63,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
             .name("Hostname")
+            .displayName("Hostname")
             .description("The ip address or hostname of the Splunk server.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .defaultValue("localhost")
@@ -71,15 +73,17 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor PORT = new PropertyDescriptor
             .Builder().name("Port")
-            .description("The HTTP Port Number of the Splunk server.")
+            .displayName("HTTP Event Collector Port")
+            .description("The HTTP Event Collector HTTP Port Number.")
             .required(true)
             .addValidator(StandardValidators.PORT_VALIDATOR)
-            .defaultValue("9088")
+            .defaultValue("8088")
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
             .name("Security Protocol")
+            .displayName("Security Protocol")
             .description("The security protocol to use for communicating with 
Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, 
SSL_3_VALUE)
@@ -88,6 +92,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
             .name("Owner")
+            .displayName("Owner")
             .description("The owner to pass to Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
@@ -96,7 +101,8 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
             .name("Token")
-            .description("The token to pass to Splunk.")
+            .displayName("HTTP Event Collector Token")
+            .description("HTTP Event Collector token starting with the string 
Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -104,6 +110,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
             .name("Username")
+            .displayName("Username")
             .description("The username to authenticate to Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
@@ -112,6 +119,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
             .name("Password")
+            .displayName("Password")
             .description("The password to authenticate to Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
index 9d81de8..50f6f87 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
@@ -46,8 +46,9 @@
     includes unsuccessful or ongoing indexing and unknown acknowledgement 
identifiers. In order to avoid infinite tries,
     QuerySplunkIndexingStatus gives user the possibility to set a "Maximum 
waiting time". Results with value of false from Splunk
     within the specified waiting time will be handled as "undetermined" and 
are transferred to the "undetermined" relationship.
-    Flow files outside of this time range will be transferred to the 
"unacknowledged" relationship next time the processor is
-    triggered. In order to determine if the indexing of a given event is 
within the waiting time, the Unix Epoch of the original
+    Flow files outside of this time range will be queried as well and be 
transferred to either "acknowledged" or "unacknowledged"
+    relationship determined by the Splunk response.
+    In order to determine if the indexing of a given event is within the 
waiting time, the Unix Epoch of the original
     Splunk response is stored in the attribute "splunk.responded.at". Setting 
"Maximum waiting time" too low might
     result some false negative result as in case under higher load, Splunk 
server might index slower than it is expected.
 </p>
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
index 2b91f17..bd33947 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
@@ -39,7 +39,6 @@ import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -121,17 +120,6 @@ public class TestQuerySplunkIndexingStatus {
     }
 
     @Test
-    public void testTimedOutEvents() throws Exception {
-        // when
-        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(2)));
-        testRunner.run();
-
-        // then
-        Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), 
Mockito.any(RequestMessage.class));
-        
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED,
 1);
-    }
-
-    @Test
     public void testWhenFlowFileIsLackOfNecessaryAttributes() throws Exception 
{
         // when
         testRunner.enqueue(EVENT);

Reply via email to