markap14 commented on a change in pull request #4714: URL: https://github.com/apache/nifi/pull/4714#discussion_r563904084
########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful." + + "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " + + "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.") + .build(); + + static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder() + .name("undetermined") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " + + "FlowFiles transferred to this relationship might be penalized! " + Review comment: No reason to add an exclamation mark here - it is a very normal thing. Under what conditions would they be penalized? An indication that they "might be penalized" leaves me with more questions than answers, as a user. ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful." + Review comment: nit: Missing a space after the period. ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful." + + "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " + + "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.") + .build(); + + static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder() + .name("undetermined") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " + + "FlowFiles transferred to this relationship might be penalized! " + + "This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication. " + + "FlowFiles are timing out or unknown by the Splunk server will transferred to \"undetermined\" relationship.") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_ACKNOWLEDGED, + RELATIONSHIP_UNACKNOWLEDGED, + RELATIONSHIP_UNDETERMINED, + RELATIONSHIP_FAILURE + ))); + + static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("ttl") + .displayName("Maximum Waiting Time") + .description( + "The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration. " + + "After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the \"unacknowledged\" relationship.") + .defaultValue("1 hour") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder() + .name("max-query-size") + .displayName("Maximum Query Size") + .description( + "The maximum number of acknowledgement identifiers the outgoing query contains in one batch. " + + "It is recommended not to set it too low in order to reduce network communication.") + .defaultValue("10000") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private volatile Integer maxQuerySize; + private volatile Integer ttl; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> result = new ArrayList<>(); + final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors(); + result.addAll(common); + result.add(TTL); + result.add(MAX_QUERY_SIZE); + return result; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger(); + ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + } + + @OnStopped + public void onUnscheduled() { + super.onUnscheduled(); + maxQuerySize = null; + ttl = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final RequestMessage requestMessage; + final List<FlowFile> flowFiles = session.get(maxQuerySize); + + if (flowFiles.isEmpty()) { + return; + } + + final long currentTime = System.currentTimeMillis(); + final Map<Long, FlowFile> undetermined = new HashMap<>(); + + for (final FlowFile flowFile : flowFiles) { + final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)); + final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE)); + + if (!sentAt.isPresent() || !ackId.isPresent()) { + getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!", + new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE}); + session.transfer(flowFile, RELATIONSHIP_FAILURE); + } else if (sentAt.get() + ttl < currentTime) { + session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED); Review comment: This seems like the wrong behavior to me. If the data is put to splunk, and then sent to this processor, but sits in the queue for a while (due to the processor being stopped or having a long backlog, etc.) then the FlowFile will immediately go to Unacknowledged, even though Splunk was never queried for the status. Are we sure that's what we want to happen here? ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.SendRawDataResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http"}) +@CapabilityDescription("Sends flow file content to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.") +@ReadsAttribute(attribute = "mime.type", description = "Uses as value for HTTP Content-Type header if set.") +@WritesAttributes({ + @WritesAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @WritesAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SystemResourceConsideration(resource = SystemResource.MEMORY) +@SeeAlso(QuerySplunkIndexingStatus.class) +public class PutSplunkHTTP extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/raw"; + + static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder() + .name("source") + .displayName("Source") + .description("User-defined event source. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() + .name("source-type") + .displayName("Source Type") + .description("User-defined event sourcetype. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("host") + .displayName("Host") + .description("Specify with the host query string parameter. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("index") + .displayName("Index") + .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("character-set") + .displayName("Character Set") + .description("The name of the character set.") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(Charset.defaultCharset().name()) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("content-type") + .displayName("Content Type") + .description( + "The media type of the event sent to Splunk. " + + "If not set, \"mime.type\" flow file attribute will be used. " + + "In case of neither of them is specified, this information will not be sent to the server.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are sent successfully to the destination are sent to this relationship.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to send to the destination are sent to this relationship.") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_SUCCESS, + RELATIONSHIP_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors()); + result.add(SOURCE); + result.add(SOURCE_TYPE); + result.add(HOST); + result.add(INDEX); + result.add(CONTENT_TYPE); + result.add(CHARSET); + return result; + } + + private volatile String endpoint; + private volatile String contentType; + private volatile String charset; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + + if (context.getProperty(CONTENT_TYPE).isSet()) { + contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue(); + } + + charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + + final Map<String, String> queryParameters = new HashMap<>(); + + if (context.getProperty(SOURCE_TYPE).isSet()) { + queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(SOURCE).isSet()) { + queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(HOST).isSet()) { + queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(INDEX).isSet()) { + queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue()); + } + + endpoint = getEndpoint(queryParameters); + } + + private String getEndpoint(final Map<String, String> queryParameters) { + if (queryParameters.isEmpty()) { + return ENDPOINT; + } + + try { + return URLEncoder.encode(ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&")), "UTF-8"); + } catch (final UnsupportedEncodingException e) { + getLogger().error("Could not be initialized because of: {}", new Object[] {e.getMessage()}, e); + throw new ProcessException(e); + } + } + + @OnStopped + public void onUnscheduled() { Review comment: Probably should rename method to `onStopped` instead of `onUnscheduled` ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful." + + "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " + + "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.") + .build(); + + static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder() + .name("undetermined") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " + + "FlowFiles transferred to this relationship might be penalized! " + + "This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication. " + + "FlowFiles are timing out or unknown by the Splunk server will transferred to \"undetermined\" relationship.") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_ACKNOWLEDGED, + RELATIONSHIP_UNACKNOWLEDGED, + RELATIONSHIP_UNDETERMINED, + RELATIONSHIP_FAILURE + ))); + + static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("ttl") + .displayName("Maximum Waiting Time") + .description( + "The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration. " + + "After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the \"unacknowledged\" relationship.") + .defaultValue("1 hour") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder() + .name("max-query-size") + .displayName("Maximum Query Size") + .description( + "The maximum number of acknowledgement identifiers the outgoing query contains in one batch. " + + "It is recommended not to set it too low in order to reduce network communication.") + .defaultValue("10000") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private volatile Integer maxQuerySize; + private volatile Integer ttl; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> result = new ArrayList<>(); + final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors(); + result.addAll(common); + result.add(TTL); + result.add(MAX_QUERY_SIZE); + return result; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger(); + ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + } + + @OnStopped + public void onUnscheduled() { Review comment: Probably should name the method `onStopped` rather than `onUnscheduled`. ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html ########## @@ -0,0 +1,76 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>QuerySplunkIndexingStatus</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>QuerySplunkIndexingStatus</h2> + +<p> + This processor is responsible for polling Splunk server and determine if a Splunk event is acknowledged at the time of + execution. For more details about the HEC Index Acknowledgement please see + <a href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck">this documentation.</a> +</p> + +<h3>Prerequisites</h3> + +<p> + In order to work properly, the incoming flow files need to have the attributes "splunk.acknowledgement.id" and + "splunk.responded.at" filled properly. The flow file attribute "splunk.acknowledgement.id" should continue the "ackId" + contained by the response of the Splunk from the original put call. The flow file attribute "splunk.responded.at" + should contain the Unix Epoch the put call was answered by Splunk. It is suggested to use PutSplunkHTTP processor to execute + the put call and set these attributes. +</p> + +<h3>Unacknowledged and undetermined cases</h3> + +<p> + Splunk serves information only about successful acknowledgement. In every other case it will return a false value. This Review comment: Referring to this here as a "false value" makes it sound like a "fake value" or an "invalid value". Would recommend instead "... it will return a value of <code>false</code>." ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.splunk.HttpException; +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import com.splunk.SSLSecurityProtocol; +import com.splunk.Service; +import com.splunk.ServiceArgs; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +abstract class SplunkAPICall extends AbstractProcessor { + private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel"; + + private static final String HTTP_SCHEME = "http"; + private static final String HTTPS_SCHEME = "https"; + + private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name()); + private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name()); + private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name()); + private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name()); + + static final String ACKNOWLEDGEMENT_ID_ATTRIBUTE = "splunk.acknowledgement.id"; + static final String RESPONDED_AT_ATTRIBUTE = "splunk.responded.at"; + + static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder() + .name("Scheme") + .description("The scheme for connecting to Splunk.") + .allowableValues(HTTPS_SCHEME, HTTP_SCHEME) + .defaultValue(HTTPS_SCHEME) + .required(true) + .build(); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The ip address or hostname of the Splunk server.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("localhost") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The HTTP Port Number of the Splunk server.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("9088") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("Security Protocol") + .description("The security protocol to use for communicating with Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE) + .defaultValue(TLS_1_2_VALUE.getValue()) + .build(); + + static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() + .name("Owner") + .description("The owner to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder() + .name("Token") + .description("The token to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("The username to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); + + static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder() + .name("request-channel") + .displayName("Splunk Request Channel") + .description("Identifier of the used request channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList( + SCHEME, + HOSTNAME, + PORT, + SECURITY_PROTOCOL, + OWNER, + TOKEN, + USERNAME, + PASSWORD, + REQUEST_CHANNEL + ); + + private final JsonFactory jsonFactory = new JsonFactory(); + private final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory); + + private volatile ServiceArgs splunkServiceArguments; + private volatile Service splunkService; + private volatile String requestChannel; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return SplunkAPICall.PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + splunkServiceArguments = getSplunkServiceArgs(context); + splunkService = getSplunkService(splunkServiceArguments); + requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue(); + } + + private ServiceArgs getSplunkServiceArgs(final ProcessContext context) { + final ServiceArgs splunkServiceArguments = new ServiceArgs(); + + splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue()); + splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue()); + splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger()); + + if (context.getProperty(OWNER).isSet()) { + splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(TOKEN).isSet()) { + splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(USERNAME).isSet()) { + splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(PASSWORD).isSet()) { + splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue()); + } + + if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) { + splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue())); + } + + return splunkServiceArguments; + } + + protected Service getSplunkService(final ServiceArgs splunkServiceArguments) { + return Service.connect(splunkServiceArguments); + } + + @OnStopped + public void onUnscheduled() { Review comment: Should rename method to `onStopped` ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful." + + "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " + + "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.") + .build(); + + static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder() + .name("undetermined") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " + + "FlowFiles transferred to this relationship might be penalized! " + + "This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.") Review comment: Not entirely sure what is being said here. "with a false response" sounds like the Splunk server is responding with an invalid or a "fake" response. Perhaps rephrasing to something along the lines of "but with a response indicating that the id has not been acknowledged"? I *think* that's what this is intended to convey. ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html ########## @@ -0,0 +1,75 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>PutSplunkHTTP</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>PutSplunkHTTP</h2> + +<p> + This processor serves as a counterpart for PutSplunk processor. While the later solves communication using TCP and + UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or HTTPS. In this fashion, this processor + shows similarities with GetSplunk processor and the properties relevant to the connection with Splunk server are + identical. There are however some aspects unique for this processor: +</p> + +<h3>Content details</h3> + +<p> + PutSplunkHTTP allows the user to specify some metadata about the event is being sent to the Splunk. These include: the Review comment: typo: should be "some metadata about the event being sent" - reads "some metadata about the event _is_ being sent" ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful." + + "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " + + "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.") + .build(); + + static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder() + .name("undetermined") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " + + "FlowFiles transferred to this relationship might be penalized! " + + "This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication. " + + "FlowFiles are timing out or unknown by the Splunk server will transferred to \"undetermined\" relationship.") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_ACKNOWLEDGED, + RELATIONSHIP_UNACKNOWLEDGED, + RELATIONSHIP_UNDETERMINED, + RELATIONSHIP_FAILURE + ))); + + static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("ttl") + .displayName("Maximum Waiting Time") + .description( + "The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration. " + + "After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the \"unacknowledged\" relationship.") + .defaultValue("1 hour") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder() + .name("max-query-size") + .displayName("Maximum Query Size") + .description( + "The maximum number of acknowledgement identifiers the outgoing query contains in one batch. " + + "It is recommended not to set it too low in order to reduce network communication.") + .defaultValue("10000") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private volatile Integer maxQuerySize; + private volatile Integer ttl; + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> result = new ArrayList<>(); + final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors(); + result.addAll(common); + result.add(TTL); + result.add(MAX_QUERY_SIZE); + return result; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger(); + ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + } + + @OnStopped + public void onUnscheduled() { + super.onUnscheduled(); + maxQuerySize = null; + ttl = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final RequestMessage requestMessage; + final List<FlowFile> flowFiles = session.get(maxQuerySize); + + if (flowFiles.isEmpty()) { + return; + } + + final long currentTime = System.currentTimeMillis(); + final Map<Long, FlowFile> undetermined = new HashMap<>(); + + for (final FlowFile flowFile : flowFiles) { + final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)); + final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE)); + + if (!sentAt.isPresent() || !ackId.isPresent()) { + getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!", + new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE}); + session.transfer(flowFile, RELATIONSHIP_FAILURE); + } else if (sentAt.get() + ttl < currentTime) { + session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED); + } else { + undetermined.put(ackId.get(), flowFile); + } + } + + if (undetermined.isEmpty()) { + getLogger().debug("There was no eligible flow file to send request to Splunk."); + return; + } + + try { + requestMessage = createRequestMessage(undetermined); + } catch (final IOException e) { + getLogger().error("Could not prepare Splunk request!", e); + session.transfer(undetermined.values(), RELATIONSHIP_FAILURE); + return; + } + + try { + final ResponseMessage responseMessage = call(ENDPOINT, requestMessage); + + if (responseMessage.getStatus() == 200) { + final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class); + + splunkResponse.getAcks().entrySet().forEach(result -> { + final FlowFile toTransfer = undetermined.get(result.getKey()); + + if (result.getValue()) { + session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED); + } else { + session.penalize(toTransfer); + session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED); + } + }); + } else { + getLogger().error("Query index status was not successful because of ({}) {}", new Object[] {responseMessage.getStatus(), responseMessage.getContent()}); + context.yield(); + session.transfer(undetermined.values(), RELATIONSHIP_UNDETERMINED); + } + } catch (final Exception e) { + getLogger().error("Error during communication with Splunk server!", e); Review comment: Would generally recommend against exclamation points in logs ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html ########## @@ -0,0 +1,76 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>QuerySplunkIndexingStatus</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> +<h2>QuerySplunkIndexingStatus</h2> + +<p> + This processor is responsible for polling Splunk server and determine if a Splunk event is acknowledged at the time of + execution. For more details about the HEC Index Acknowledgement please see + <a href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck">this documentation.</a> +</p> + +<h3>Prerequisites</h3> + +<p> + In order to work properly, the incoming flow files need to have the attributes "splunk.acknowledgement.id" and + "splunk.responded.at" filled properly. The flow file attribute "splunk.acknowledgement.id" should continue the "ackId" + contained by the response of the Splunk from the original put call. The flow file attribute "splunk.responded.at" + should contain the Unix Epoch the put call was answered by Splunk. It is suggested to use PutSplunkHTTP processor to execute + the put call and set these attributes. +</p> + +<h3>Unacknowledged and undetermined cases</h3> + +<p> + Splunk serves information only about successful acknowledgement. In every other case it will return a false value. This + includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries, + QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". False results from Splunk within + the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship. + Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is + triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch stored in + the attribute specified by "Splunk Sent At Attribute Name" will be used. Setting "Maximum waiting time" too low might Review comment: I don't see a "Splunk Sent At Attribute Name" anywhere. Was this removed? If so, should update the docs. ########## File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.SendRawDataResponse; +import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http"}) +@CapabilityDescription("Sends flow file content to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.") +@ReadsAttribute(attribute = "mime.type", description = "Uses as value for HTTP Content-Type header if set.") +@WritesAttributes({ + @WritesAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @WritesAttribute(attribute = "splunk.send.at", description = "The time of sending the put request for Splunk.")}) +@SystemResourceConsideration(resource = SystemResource.MEMORY) +@SeeAlso(QuerySplunkIndexingStatus.class) +public class PutSplunkHTTP extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/raw"; + + static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder() + .name("source") + .displayName("Source") + .description("User-defined event source. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() + .name("source-type") + .displayName("Source Type") + .description("User-defined event sourcetype. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("host") + .displayName("Host") + .description("Specify with the host query string parameter. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("index") + .displayName("Index") + .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("character-set") + .displayName("Character Set") + .description("The name of the character set.") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(Charset.defaultCharset().name()) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("content-type") + .displayName("Content Type") + .description( + "The media type of the event sent to Splunk. " + + "If not set, \"mime.type\" flow file attribute will be used. " + + "In case of neither of them is specified, this information will not be sent to the server.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are sent successfully to the destination are sent out this relationship.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to send to the destination are sent out this relationship.") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_SUCCESS, + RELATIONSHIP_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors()); + result.add(SOURCE); + result.add(SOURCE_TYPE); + result.add(HOST); + result.add(INDEX); + result.add(CONTENT_TYPE); + result.add(CHARSET); + return result; + } + + private volatile String endpoint; + private volatile String contentType; + private volatile String charset; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + + if (context.getProperty(CONTENT_TYPE).isSet()) { + contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue(); + } + + charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + + final Map<String, String> queryParameters = new HashMap<>(); + + if (context.getProperty(SOURCE_TYPE).isSet()) { + queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(SOURCE).isSet()) { + queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(HOST).isSet()) { + queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(INDEX).isSet()) { + queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue()); + } + + endpoint = getEndpoint(queryParameters); + } + + private String getEndpoint(final Map<String, String> queryParameters) { + if (queryParameters.isEmpty()) { + return ENDPOINT; + } + + try { + return URLEncoder.encode(ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&")), "UTF-8"); + } catch (final UnsupportedEncodingException e) { + getLogger().error("Could not be initialized because of: {}", new Object[] {e.getMessage()}, e); + throw new ProcessException(e); + } + } + + @OnStopped + public void onUnscheduled() { + super.onUnscheduled(); + contentType = null; + charset = null; + endpoint = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + boolean success = false; + + if (flowFile == null) { + return; + } + + try { + final RequestMessage requestMessage = createRequestMessage(session, flowFile); + final ResponseMessage responseMessage = call(endpoint, requestMessage); + flowFile = session.putAttribute(flowFile, "splunk.status.code", String.valueOf(responseMessage.getStatus())); + + switch (responseMessage.getStatus()) { + case 200: + final SendRawDataSuccessResponse successResponse = unmarshallResult(responseMessage.getContent(), SendRawDataSuccessResponse.class); + + if (successResponse.getCode() == 0) { + flowFile = enrichFlowFile(session, flowFile, successResponse.getAckId()); + success = true; + } else { + flowFile = session.putAttribute(flowFile, "splunk.response.code", String.valueOf(successResponse.getCode())); + getLogger().error("Putting data into Splunk was not successful: ({}) {}", new Object[] {successResponse.getCode(), successResponse.getText()}); + } + + break; + case 503 : // HEC is unhealthy, queues are full + context.yield(); + // fall-through + default: + final SendRawDataResponse response = unmarshallResult(responseMessage.getContent(), SendRawDataResponse.class); + getLogger().error("Putting data into Splunk was not successful: {}", new Object[] {response.getText()}); + } + } catch (final Exception e) { + getLogger().error("Error during communication with Splunk: {}", new Object[] {e.getMessage()}, e); + } finally { + session.transfer(flowFile, success ? RELATIONSHIP_SUCCESS : RELATIONSHIP_FAILURE); + } + } + + private RequestMessage createRequestMessage(final ProcessSession session, final FlowFile flowFile) { + final RequestMessage requestMessage = new RequestMessage("POST"); + final String flowFileContentType = Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type")); + + if (flowFileContentType != null) { + requestMessage.getHeader().put("Content-Type", flowFileContentType); + } + + requestMessage.setContent(extractTextMessageBody(flowFile, session, charset)); Review comment: Thanks for that clarification. Probably makes sense to put a comment to that effect in the code so that it's understood later, as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
