markap14 commented on a change in pull request #4714:
URL: https://github.com/apache/nifi/pull/4714#discussion_r563904084



##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status 
of indexing acknowledgement.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
+        @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SeeAlso(PutSplunkHTTP.class)
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship when 
the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new 
Relationship.Builder()
+            .name("unacknowledged")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful." +
+                    "This can happen when the acknowledgement did not happened 
within the time period set for Maximum Waiting Time. " +
+                    "FlowFiles with acknowledgement id unknown for the Splunk 
server will be transferred to this relationship after the Maximum Waiting Time 
is reached.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new 
Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement state is not determined. " +
+                    "FlowFiles transferred to this relationship might be 
penalized! " +

Review comment:
       No reason to add an exclamation mark here - it is a very normal thing. 
Under what conditions would they be penalized? An indication that they "might 
be penalized" leaves me with more questions than answers, as a user.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status 
of indexing acknowledgement.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
+        @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SeeAlso(PutSplunkHTTP.class)
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship when 
the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new 
Relationship.Builder()
+            .name("unacknowledged")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful." +

Review comment:
       nit: Missing a space after the period.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status 
of indexing acknowledgement.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
+        @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SeeAlso(PutSplunkHTTP.class)
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship when 
the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new 
Relationship.Builder()
+            .name("unacknowledged")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful." +
+                    "This can happen when the acknowledgement did not happened 
within the time period set for Maximum Waiting Time. " +
+                    "FlowFiles with acknowledgement id unknown for the Splunk 
server will be transferred to this relationship after the Maximum Waiting Time 
is reached.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new 
Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement state is not determined. " +
+                    "FlowFiles transferred to this relationship might be 
penalized! " +
+                    "This happens when Splunk returns with HTTP 200 but with 
false response for the acknowledgement id in the flow file attribute.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful due to errors during the communication. " +
+                    "FlowFiles are timing out or unknown by the Splunk server 
will transferred to \"undetermined\" relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum Waiting Time")
+            .description(
+                    "The maximum time the processor tries to acquire 
acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the processor considers 
the index as not acknowledged and transfers the FlowFile to the 
\"unacknowledged\" relationship.")
+            .defaultValue("1 hour")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the 
outgoing query contains in one batch. " +
+                    "It is recommended not to set it too low in order to 
reduce network communication.")
+            .defaultValue("10000")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = 
super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = 
context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnStopped
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final RequestMessage requestMessage;
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final long currentTime = System.currentTimeMillis();
+        final Map<Long, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final Optional<Long> sentAt = 
extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
+            final Optional<Long> ackId = 
extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+
+            if (!sentAt.isPresent() || !ackId.isPresent()) {
+                getLogger().error("Flow file ({}) attributes {} and {} are 
expected to be set using 64-bit integer values!",
+                        new Object[]{flowFile.getId(), 
SplunkAPICall.RESPONDED_AT_ATTRIBUTE, 
SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (sentAt.get() + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);

Review comment:
       This seems like the wrong behavior to me. If the data is put to splunk, 
and then sent to this processor, but sits in the queue for a while (due to the 
processor being stopped or having a long backlog, etc.) then the FlowFile will 
immediately go to Unacknowledged, even though Splunk was never queried for the 
status. Are we sure that's what we want to happen here?

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends flow file content to the specified Splunk server 
over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+@ReadsAttribute(attribute = "mime.type", description = "Uses as value for HTTP 
Content-Type header if set.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "splunk.acknowledgement.id", description 
= "The indexing acknowledgement id provided by Splunk."),
+        @WritesAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@SeeAlso(QuerySplunkIndexingStatus.class)
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all 
events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for 
all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a 
default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string 
parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be 
used. " +
+                    "In case of neither of them is specified, this information 
will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the 
destination are sent to this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are 
sent to this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", 
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", 
context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", 
context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", 
context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        endpoint = getEndpoint(queryParameters);
+    }
+
+    private String getEndpoint(final Map<String, String> queryParameters) {
+        if (queryParameters.isEmpty()) {
+            return ENDPOINT;
+        }
+
+        try {
+            return URLEncoder.encode(ENDPOINT + '?' + 
queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + 
e.getValue()).collect(Collectors.joining("&")), "UTF-8");
+        } catch (final UnsupportedEncodingException e) {
+            getLogger().error("Could not be initialized because of: {}", new 
Object[] {e.getMessage()}, e);
+            throw new ProcessException(e);
+        }
+    }
+
+    @OnStopped
+    public void onUnscheduled() {

Review comment:
       Probably should rename method to `onStopped` instead of `onUnscheduled`

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status 
of indexing acknowledgement.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
+        @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SeeAlso(PutSplunkHTTP.class)
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship when 
the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new 
Relationship.Builder()
+            .name("unacknowledged")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful." +
+                    "This can happen when the acknowledgement did not happened 
within the time period set for Maximum Waiting Time. " +
+                    "FlowFiles with acknowledgement id unknown for the Splunk 
server will be transferred to this relationship after the Maximum Waiting Time 
is reached.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new 
Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement state is not determined. " +
+                    "FlowFiles transferred to this relationship might be 
penalized! " +
+                    "This happens when Splunk returns with HTTP 200 but with 
false response for the acknowledgement id in the flow file attribute.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful due to errors during the communication. " +
+                    "FlowFiles are timing out or unknown by the Splunk server 
will transferred to \"undetermined\" relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum Waiting Time")
+            .description(
+                    "The maximum time the processor tries to acquire 
acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the processor considers 
the index as not acknowledged and transfers the FlowFile to the 
\"unacknowledged\" relationship.")
+            .defaultValue("1 hour")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the 
outgoing query contains in one batch. " +
+                    "It is recommended not to set it too low in order to 
reduce network communication.")
+            .defaultValue("10000")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = 
super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = 
context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnStopped
+    public void onUnscheduled() {

Review comment:
       Probably should name the method `onStopped` rather than `onUnscheduled`.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>QuerySplunkIndexingStatus</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>QuerySplunkIndexingStatus</h2>
+
+<p>
+    This processor is responsible for polling Splunk server and determine if a 
Splunk event is acknowledged at the time of
+    execution. For more details about the HEC Index Acknowledgement please see
+    <a 
href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck";>this
 documentation.</a>
+</p>
+
+<h3>Prerequisites</h3>
+
+<p>
+    In order to work properly, the incoming flow files need to have the 
attributes "splunk.acknowledgement.id" and
+    "splunk.responded.at" filled properly. The flow file attribute 
"splunk.acknowledgement.id" should continue the "ackId"
+    contained by the response of the Splunk from the original put call. The 
flow file attribute "splunk.responded.at"
+    should contain the Unix Epoch the put call was answered by Splunk. It is 
suggested to use PutSplunkHTTP processor to execute
+    the put call and set these attributes.
+</p>
+
+<h3>Unacknowledged and undetermined cases</h3>
+
+<p>
+    Splunk serves information only about successful acknowledgement. In every 
other case it will return a false value. This

Review comment:
       Referring to this here as a "false value" makes it sound like a "fake 
value" or an "invalid value". Would recommend instead "... it will return a 
value of <code>false</code>."

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.HttpException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = 
"X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), 
SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), 
SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new 
AllowableValue(SSLSecurityProtocol.TLSv1.name(), 
SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new 
AllowableValue(SSLSecurityProtocol.SSLv3.name(), 
SSLSecurityProtocol.SSLv3.name());
+
+    static final String ACKNOWLEDGEMENT_ID_ATTRIBUTE = 
"splunk.acknowledgement.id";
+    static final String RESPONDED_AT_ATTRIBUTE = "splunk.responded.at";
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with 
Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, 
SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new 
PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL
+    );
+
+    private final JsonFactory jsonFactory = new JsonFactory();
+    private final ObjectMapper jsonObjectMapper = new 
ObjectMapper(jsonFactory);
+
+    private volatile ServiceArgs splunkServiceArguments;
+    private volatile Service splunkService;
+    private volatile String requestChannel;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = 
context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && 
context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs 
splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnStopped
+    public void onUnscheduled() {

Review comment:
       Should rename method to `onStopped`

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status 
of indexing acknowledgement.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
+        @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SeeAlso(PutSplunkHTTP.class)
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship when 
the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new 
Relationship.Builder()
+            .name("unacknowledged")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful." +
+                    "This can happen when the acknowledgement did not happened 
within the time period set for Maximum Waiting Time. " +
+                    "FlowFiles with acknowledgement id unknown for the Splunk 
server will be transferred to this relationship after the Maximum Waiting Time 
is reached.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new 
Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement state is not determined. " +
+                    "FlowFiles transferred to this relationship might be 
penalized! " +
+                    "This happens when Splunk returns with HTTP 200 but with 
false response for the acknowledgement id in the flow file attribute.")

Review comment:
       Not entirely sure what is being said here. "with a false response" 
sounds like the Splunk server is responding with an invalid or a "fake" 
response. Perhaps rephrasing to something along the lines of "but with a 
response indicating that the id has not been acknowledged"? I *think* that's 
what this is intended to convey.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html
##########
@@ -0,0 +1,75 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSplunkHTTP</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>PutSplunkHTTP</h2>
+
+<p>
+    This processor serves as a counterpart for PutSplunk processor. While the 
later solves communication using TCP and
+    UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or 
HTTPS. In this fashion, this processor
+    shows similarities with GetSplunk processor and the properties relevant to 
the connection with Splunk server are
+    identical. There are however some aspects unique for this processor:
+</p>
+
+<h3>Content details</h3>
+
+<p>
+    PutSplunkHTTP allows the user to specify some metadata about the event is 
being sent to the Splunk. These include: the

Review comment:
       typo: should be "some metadata about the event being sent" - reads "some 
metadata about the event _is_ being sent"

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status 
of indexing acknowledgement.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
+        @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@SeeAlso(PutSplunkHTTP.class)
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new 
Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship when 
the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new 
Relationship.Builder()
+            .name("unacknowledged")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful." +
+                    "This can happen when the acknowledgement did not happened 
within the time period set for Maximum Waiting Time. " +
+                    "FlowFiles with acknowledgement id unknown for the Splunk 
server will be transferred to this relationship after the Maximum Waiting Time 
is reached.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new 
Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement state is not determined. " +
+                    "FlowFiles transferred to this relationship might be 
penalized! " +
+                    "This happens when Splunk returns with HTTP 200 but with 
false response for the acknowledgement id in the flow file attribute.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description(
+                    "A FlowFile is transferred to this relationship when the 
acknowledgement was not successful due to errors during the communication. " +
+                    "FlowFiles are timing out or unknown by the Splunk server 
will transferred to \"undetermined\" relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum Waiting Time")
+            .description(
+                    "The maximum time the processor tries to acquire 
acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the processor considers 
the index as not acknowledged and transfers the FlowFile to the 
\"unacknowledged\" relationship.")
+            .defaultValue("1 hour")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the 
outgoing query contains in one batch. " +
+                    "It is recommended not to set it too low in order to 
reduce network communication.")
+            .defaultValue("10000")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = 
super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = 
context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnStopped
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final RequestMessage requestMessage;
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final long currentTime = System.currentTimeMillis();
+        final Map<Long, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final Optional<Long> sentAt = 
extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
+            final Optional<Long> ackId = 
extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+
+            if (!sentAt.isPresent() || !ackId.isPresent()) {
+                getLogger().error("Flow file ({}) attributes {} and {} are 
expected to be set using 64-bit integer values!",
+                        new Object[]{flowFile.getId(), 
SplunkAPICall.RESPONDED_AT_ATTRIBUTE, 
SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (sentAt.get() + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(ackId.get(), flowFile);
+            }
+        }
+
+        if (undetermined.isEmpty()) {
+            getLogger().debug("There was no eligible flow file to send request 
to Splunk.");
+            return;
+        }
+
+        try {
+            requestMessage = createRequestMessage(undetermined);
+        } catch (final IOException e) {
+            getLogger().error("Could not prepare Splunk request!", e);
+            session.transfer(undetermined.values(), RELATIONSHIP_FAILURE);
+            return;
+        }
+
+        try {
+            final ResponseMessage responseMessage = call(ENDPOINT, 
requestMessage);
+
+            if (responseMessage.getStatus() == 200) {
+                final EventIndexStatusResponse splunkResponse = 
unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
+
+                splunkResponse.getAcks().entrySet().forEach(result -> {
+                    final FlowFile toTransfer = 
undetermined.get(result.getKey());
+
+                    if (result.getValue()) {
+                        session.transfer(toTransfer, 
RELATIONSHIP_ACKNOWLEDGED);
+                    } else {
+                        session.penalize(toTransfer);
+                        session.transfer(toTransfer, 
RELATIONSHIP_UNDETERMINED);
+                    }
+                });
+            } else {
+                getLogger().error("Query index status was not successful 
because of ({}) {}", new Object[] {responseMessage.getStatus(), 
responseMessage.getContent()});
+                context.yield();
+                session.transfer(undetermined.values(), 
RELATIONSHIP_UNDETERMINED);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Error during communication with Splunk 
server!", e);

Review comment:
       Would generally recommend against exclamation points in logs

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>QuerySplunkIndexingStatus</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>QuerySplunkIndexingStatus</h2>
+
+<p>
+    This processor is responsible for polling Splunk server and determine if a 
Splunk event is acknowledged at the time of
+    execution. For more details about the HEC Index Acknowledgement please see
+    <a 
href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck";>this
 documentation.</a>
+</p>
+
+<h3>Prerequisites</h3>
+
+<p>
+    In order to work properly, the incoming flow files need to have the 
attributes "splunk.acknowledgement.id" and
+    "splunk.responded.at" filled properly. The flow file attribute 
"splunk.acknowledgement.id" should continue the "ackId"
+    contained by the response of the Splunk from the original put call. The 
flow file attribute "splunk.responded.at"
+    should contain the Unix Epoch the put call was answered by Splunk. It is 
suggested to use PutSplunkHTTP processor to execute
+    the put call and set these attributes.
+</p>
+
+<h3>Unacknowledged and undetermined cases</h3>
+
+<p>
+    Splunk serves information only about successful acknowledgement. In every 
other case it will return a false value. This
+    includes unsuccessful or ongoing indexing and unknown acknowledgement 
identifiers. In order to avoid infinite tries,
+    QuerySplunkIndexingStatus gives user the possibility to set a "Maximum 
waiting time". False results from Splunk within
+    the specified waiting time will be handled as "undetermined" and are 
transferred to the "undetermined" relationship.
+    Flow files outside of this time range will be transferred to the 
"unacknowledged" relationship next time the processor is
+    triggered. In order to determine if the indexing of a given event is 
within the waiting time, the Unix Epoch stored in
+    the attribute specified by "Splunk Sent At Attribute Name" will be used. 
Setting "Maximum waiting time" too low might

Review comment:
       I don't see a "Splunk Sent At Attribute Name" anywhere. Was this 
removed? If so, should update the docs.

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends flow file content to the specified Splunk server 
over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+@ReadsAttribute(attribute = "mime.type", description = "Uses as value for HTTP 
Content-Type header if set.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "splunk.acknowledgement.id", description 
= "The indexing acknowledgement id provided by Splunk."),
+        @WritesAttribute(attribute = "splunk.send.at", description = "The time 
of sending the put request for Splunk.")})
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@SeeAlso(QuerySplunkIndexingStatus.class)
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all 
events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for 
all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a 
default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string 
parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be 
used. " +
+                    "In case of neither of them is specified, this information 
will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the 
destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are 
sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = 
context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", 
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", 
context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", 
context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", 
context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        endpoint = getEndpoint(queryParameters);
+    }
+
+    private String getEndpoint(final Map<String, String> queryParameters) {
+        if (queryParameters.isEmpty()) {
+            return ENDPOINT;
+        }
+
+        try {
+            return URLEncoder.encode(ENDPOINT + '?' + 
queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + 
e.getValue()).collect(Collectors.joining("&")), "UTF-8");
+        } catch (final UnsupportedEncodingException e) {
+            getLogger().error("Could not be initialized because of: {}", new 
Object[] {e.getMessage()}, e);
+            throw new ProcessException(e);
+        }
+    }
+
+    @OnStopped
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        boolean success = false;
+
+        if (flowFile == null) {
+            return;
+        }
+
+        try {
+            final RequestMessage requestMessage = 
createRequestMessage(session, flowFile);
+            final ResponseMessage responseMessage = call(endpoint, 
requestMessage);
+            flowFile = session.putAttribute(flowFile, "splunk.status.code", 
String.valueOf(responseMessage.getStatus()));
+
+            switch (responseMessage.getStatus()) {
+                case 200:
+                    final SendRawDataSuccessResponse successResponse = 
unmarshallResult(responseMessage.getContent(), 
SendRawDataSuccessResponse.class);
+
+                    if (successResponse.getCode() == 0) {
+                        flowFile = enrichFlowFile(session, flowFile, 
successResponse.getAckId());
+                        success = true;
+                    } else {
+                        flowFile = session.putAttribute(flowFile, 
"splunk.response.code", String.valueOf(successResponse.getCode()));
+                        getLogger().error("Putting data into Splunk was not 
successful: ({}) {}", new Object[] {successResponse.getCode(), 
successResponse.getText()});
+                    }
+
+                    break;
+                case 503 : // HEC is unhealthy, queues are full
+                    context.yield();
+                    // fall-through
+                default:
+                    final SendRawDataResponse response = 
unmarshallResult(responseMessage.getContent(), SendRawDataResponse.class);
+                    getLogger().error("Putting data into Splunk was not 
successful: {}", new Object[] {response.getText()});
+            }
+        } catch (final Exception e) {
+            getLogger().error("Error during communication with Splunk: {}", 
new Object[] {e.getMessage()}, e);
+        } finally {
+            session.transfer(flowFile, success ? RELATIONSHIP_SUCCESS : 
RELATIONSHIP_FAILURE);
+        }
+    }
+
+    private RequestMessage createRequestMessage(final ProcessSession session, 
final FlowFile flowFile) {
+        final RequestMessage requestMessage = new RequestMessage("POST");
+        final String flowFileContentType = 
Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type"));
+
+        if (flowFileContentType != null) {
+            requestMessage.getHeader().put("Content-Type", 
flowFileContentType);
+        }
+
+        requestMessage.setContent(extractTextMessageBody(flowFile, session, 
charset));

Review comment:
       Thanks for that clarification. Probably makes sense to put a comment to 
that effect in the code so that it's understood later, as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to