http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java new file mode 100644 index 0000000..b9d9e0b --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/GetSplunk.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + + +import com.splunk.JobExportArgs; +import com.splunk.SSLSecurityProtocol; +import com.splunk.Service; +import com.splunk.ServiceArgs; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnRemoved; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedOutputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"get", "splunk", "logs"}) +@CapabilityDescription("Retrieves data from Splunk Enterprise.") +@WritesAttributes({ + @WritesAttribute(attribute="splunk.query", description = "The query that performed to produce the FlowFile."), + @WritesAttribute(attribute="splunk.earliest.time", description = "The value of the earliest time that was used when performing the query."), + @WritesAttribute(attribute="splunk.latest.time", description = "The value of the latest time that was used when performing the query.") +}) +@Stateful(scopes = Scope.CLUSTER, description = "If using one of the managed Time Range Strategies, this processor will " + + "store the values of the latest and earliest times from the previous execution so that the next execution of the " + + "can pick up where the last execution left off. The state will be cleared and start over if the query is changed.") +public class GetSplunk extends AbstractProcessor { + + public static final String HTTP_SCHEME = "http"; + public static final String HTTPS_SCHEME = "https"; + + public static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder() + .name("Scheme") + .description("The scheme for connecting to Splunk.") + .allowableValues(HTTPS_SCHEME, HTTP_SCHEME) + .defaultValue(HTTPS_SCHEME) + .required(true) + .build(); + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The ip address or hostname of the Splunk server.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("localhost") + .required(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port of the Splunk server.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("8089") + .build(); + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Query") + .description("The query to execute. Typically beginning with a <search> command followed by a search clause, " + + "such as <search source=\"tcp:7689\"> to search for messages received on TCP port 7689.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("search * | head 100") + .required(true) + .build(); + + public static final AllowableValue MANAGED_BEGINNING_VALUE = new AllowableValue("Managed from Beginning", "Managed from Beginning", + "The processor will manage the date ranges of the query starting from the beginning of time."); + public static final AllowableValue MANAGED_CURRENT_VALUE = new AllowableValue("Managed from Current", "Managed from Current", + "The processor will manage the date ranges of the query starting from the current time."); + public static final AllowableValue PROVIDED_VALUE = new AllowableValue("Provided", "Provided", + "The the time range provided through the Earliest Time and Latest Time properties will be used."); + + public static final PropertyDescriptor TIME_RANGE_STRATEGY = new PropertyDescriptor.Builder() + .name("Time Range Strategy") + .description("Indicates how to apply time ranges to each execution of the query. Selecting a managed option " + + "allows the processor to apply a time range from the last execution time to the current execution time. " + + "When using <Managed from Beginning>, an earliest time will not be applied on the first execution, and thus all " + + "records searched. When using <Managed from Current> the earliest time of the first execution will be the " + + "initial execution time. When using <Provided>, the time range will come from the Earliest Time and Latest Time " + + "properties, or no time range will be applied if these properties are left blank.") + .allowableValues(MANAGED_BEGINNING_VALUE, MANAGED_CURRENT_VALUE, PROVIDED_VALUE) + .defaultValue(PROVIDED_VALUE.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor EARLIEST_TIME = new PropertyDescriptor.Builder() + .name("Earliest Time") + .description("The value to use for the earliest time when querying. Only used with a Time Range Strategy of Provided. " + + "See Splunk's documentation on Search Time Modifiers for guidance in populating this field.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor LATEST_TIME = new PropertyDescriptor.Builder() + .name("Latest Time") + .description("The value to use for the latest time when querying. Only used with a Time Range Strategy of Provided. " + + "See Splunk's documentation on Search Time Modifiers for guidance in populating this field.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor APP = new PropertyDescriptor.Builder() + .name("Application") + .description("The Splunk Application to query.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() + .name("Owner") + .description("The owner to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder() + .name("Token") + .description("The token to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("The username to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); + + public static final AllowableValue ATOM_VALUE = new AllowableValue(JobExportArgs.OutputMode.ATOM.name(), JobExportArgs.OutputMode.ATOM.name()); + public static final AllowableValue CSV_VALUE = new AllowableValue(JobExportArgs.OutputMode.CSV.name(), JobExportArgs.OutputMode.CSV.name()); + public static final AllowableValue JSON_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON.name(), JobExportArgs.OutputMode.JSON.name()); + public static final AllowableValue JSON_COLS_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON_COLS.name(), JobExportArgs.OutputMode.JSON_COLS.name()); + public static final AllowableValue JSON_ROWS_VALUE = new AllowableValue(JobExportArgs.OutputMode.JSON_ROWS.name(), JobExportArgs.OutputMode.JSON_ROWS.name()); + public static final AllowableValue RAW_VALUE = new AllowableValue(JobExportArgs.OutputMode.RAW.name(), JobExportArgs.OutputMode.RAW.name()); + public static final AllowableValue XML_VALUE = new AllowableValue(JobExportArgs.OutputMode.XML.name(), JobExportArgs.OutputMode.XML.name()); + + public static final PropertyDescriptor OUTPUT_MODE = new PropertyDescriptor.Builder() + .name("Output Mode") + .description("The output mode for the results.") + .allowableValues(ATOM_VALUE, CSV_VALUE, JSON_VALUE, JSON_COLS_VALUE, JSON_ROWS_VALUE, RAW_VALUE, XML_VALUE) + .defaultValue(JSON_VALUE.getValue()) + .required(true) + .build(); + + public static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name()); + public static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name()); + public static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name()); + public static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name()); + + public static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("Security Protocol") + .description("The security protocol to use for communicating with Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE) + .defaultValue(TLS_1_2_VALUE.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Results retrieved from Splunk are sent out this relationship.") + .build(); + + public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + public static final String EARLIEST_TIME_KEY = "earliestTime"; + public static final String LATEST_TIME_KEY = "latestTime"; + + public static final String QUERY_ATTR = "splunk.query"; + public static final String EARLIEST_TIME_ATTR = "splunk.earliest.time"; + public static final String LATEST_TIME_ATTR = "splunk.latest.time"; + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + private volatile String transitUri; + private volatile boolean resetState = false; + private volatile Service splunkService; + protected final AtomicBoolean isInitialized = new AtomicBoolean(false); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(SCHEME); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(QUERY); + descriptors.add(TIME_RANGE_STRATEGY); + descriptors.add(EARLIEST_TIME); + descriptors.add(LATEST_TIME); + descriptors.add(APP); + descriptors.add(OWNER); + descriptors.add(TOKEN); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(SECURITY_PROTOCOL); + descriptors.add(OUTPUT_MODE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public final Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Collection<ValidationResult> results = new ArrayList<>(); + + final String scheme = validationContext.getProperty(SCHEME).getValue(); + final String secProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); + + if (HTTPS_SCHEME.equals(scheme) && StringUtils.isBlank(secProtocol)) { + results.add(new ValidationResult.Builder() + .explanation("Security Protocol must be specified when using HTTPS") + .valid(false).subject("Security Protocol").build()); + } + + final String username = validationContext.getProperty(USERNAME).getValue(); + final String password = validationContext.getProperty(PASSWORD).getValue(); + + if (!StringUtils.isBlank(username) && StringUtils.isBlank(password)) { + results.add(new ValidationResult.Builder() + .explanation("Password must be specified when providing a Username") + .valid(false).subject("Password").build()); + } + + return results; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if ( ((oldValue != null && !oldValue.equals(newValue)) || (oldValue == null && newValue != null)) + && (descriptor.equals(QUERY) + || descriptor.equals(TIME_RANGE_STRATEGY) + || descriptor.equals(EARLIEST_TIME) + || descriptor.equals(LATEST_TIME) + || descriptor.equals(HOSTNAME)) + ) { + resetState = true; + } + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final String scheme = context.getProperty(SCHEME).getValue(); + final String host = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).asInteger(); + transitUri = new StringBuilder().append(scheme).append("://").append(host).append(":").append(port).toString(); + + // if properties changed since last execution then remove any previous state + if (resetState) { + try { + context.getStateManager().clear(Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().warn("Failed to clear state", ioe); + } + resetState = false; + } + } + + @OnStopped + public void onStopped() { + if (splunkService != null) { + isInitialized.set(false); + splunkService.logout(); + splunkService = null; + } + } + + @OnRemoved + public void onRemoved(final ProcessContext context) { + try { + context.getStateManager().clear(Scope.CLUSTER); + } catch (IOException e) { + getLogger().error("Unable to clear processor state due to {}", new Object[] {e.getMessage()}, e); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final long currentTime = System.currentTimeMillis(); + + synchronized (isInitialized) { + if (!isInitialized.get()) { + splunkService = createSplunkService(context); + isInitialized.set(true); + } + } + + final String query = context.getProperty(QUERY).getValue(); + final String outputMode = context.getProperty(OUTPUT_MODE).getValue(); + final String timeRangeStrategy = context.getProperty(TIME_RANGE_STRATEGY).getValue(); + + final JobExportArgs exportArgs = new JobExportArgs(); + exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL); + exportArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode)); + + String earliestTime = null; + String latestTime = null; + + if (PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) { + // for provided we just use the values of the properties + earliestTime = context.getProperty(EARLIEST_TIME).getValue(); + latestTime = context.getProperty(LATEST_TIME).getValue(); + } else { + try { + // not provided so we need to check the previous state + final TimeRange previousRange = loadState(context.getStateManager()); + final SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_TIME_FORMAT); + + if (previousRange == null) { + // no previous state so set the earliest time based on the strategy + if (MANAGED_CURRENT_VALUE.getValue().equals(timeRangeStrategy)) { + earliestTime = dateFormat.format(new Date(currentTime)); + } + + // no previous state so set the latest time to the current time + latestTime = dateFormat.format(new Date(currentTime)); + + // if its the first time through don't actually run, just save the state to get the + // initial time saved and next execution will be the first real execution + if (latestTime.equals(earliestTime)) { + saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime)); + return; + } + + } else { + // we have previous state so set earliestTime to latestTime of last range + earliestTime = previousRange.getLatestTime(); + latestTime = dateFormat.format(new Date(currentTime)); + } + + } catch (IOException e) { + getLogger().error("Unable to load data from State Manager due to {}", new Object[] {e.getMessage()}, e); + context.yield(); + return; + } + } + + if (!StringUtils.isBlank(earliestTime)) { + exportArgs.setEarliestTime(earliestTime); + } + + if (!StringUtils.isBlank(latestTime)) { + exportArgs.setLatestTime(latestTime); + } + + getLogger().debug("Using earliestTime of {} and latestTime of {}", new Object[] {earliestTime, latestTime}); + + final InputStream exportSearch = splunkService.export(query, exportArgs); + + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream rawOut) throws IOException { + try (BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + IOUtils.copyLarge(exportSearch, out); + } + } + }); + + final Map<String,String> attributes = new HashMap<>(3); + attributes.put(EARLIEST_TIME_ATTR, earliestTime); + attributes.put(LATEST_TIME_ATTR, latestTime); + attributes.put(QUERY_ATTR, query); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + getLogger().debug("Received {} from Splunk", new Object[] {flowFile}); + + // save the time range for the next execution to pick up where we left off + // if saving fails then roll back the session so we can try again next execution + // only need to do this for the managed time strategies + if (!PROVIDED_VALUE.getValue().equals(timeRangeStrategy)) { + try { + saveState(context.getStateManager(), new TimeRange(earliestTime, latestTime)); + } catch (IOException e) { + getLogger().error("Unable to load data from State Manager due to {}", new Object[]{e.getMessage()}, e); + session.rollback(); + context.yield(); + } + } + } + + protected Service createSplunkService(final ProcessContext context) { + final ServiceArgs serviceArgs = new ServiceArgs(); + + final String scheme = context.getProperty(SCHEME).getValue(); + serviceArgs.setScheme(scheme); + + final String host = context.getProperty(HOSTNAME).getValue(); + serviceArgs.setHost(host); + + final int port = context.getProperty(PORT).asInteger(); + serviceArgs.setPort(port); + + final String app = context.getProperty(APP).getValue(); + if (!StringUtils.isBlank(app)) { + serviceArgs.setApp(app); + } + + final String owner = context.getProperty(OWNER).getValue(); + if (!StringUtils.isBlank(owner)) { + serviceArgs.setOwner(owner); + } + + final String token = context.getProperty(TOKEN).getValue(); + if (!StringUtils.isBlank(token)) { + serviceArgs.setToken(token); + } + + final String username = context.getProperty(USERNAME).getValue(); + if (!StringUtils.isBlank(username)) { + serviceArgs.setUsername(username); + } + + final String password = context.getProperty(PASSWORD).getValue(); + if (!StringUtils.isBlank(password)) { + serviceArgs.setPassword(password); + } + + final String secProtocol = context.getProperty(SECURITY_PROTOCOL).getValue(); + if (!StringUtils.isBlank(secProtocol) && HTTPS_SCHEME.equals(scheme)) { + serviceArgs.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(secProtocol)); + } + + return Service.connect(serviceArgs); + } + + private void saveState(StateManager stateManager, TimeRange timeRange) throws IOException { + final String earliest = StringUtils.isBlank(timeRange.getEarliestTime()) ? "" : timeRange.getEarliestTime(); + final String latest = StringUtils.isBlank(timeRange.getLatestTime()) ? "" : timeRange.getLatestTime(); + + Map<String,String> state = new HashMap<>(2); + state.put(EARLIEST_TIME_KEY, earliest); + state.put(LATEST_TIME_KEY, latest); + + getLogger().debug("Saving state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest}); + stateManager.setState(state, Scope.CLUSTER); + } + + private TimeRange loadState(StateManager stateManager) throws IOException { + final StateMap stateMap = stateManager.getState(Scope.CLUSTER); + + if (stateMap.getVersion() < 0) { + getLogger().debug("No previous state found"); + return null; + } + + final String earliest = stateMap.get(EARLIEST_TIME_KEY); + final String latest = stateMap.get(LATEST_TIME_KEY); + getLogger().debug("Loaded state with earliestTime of {} and latestTime of {}", new Object[] {earliest, latest}); + + if (StringUtils.isBlank(earliest) && StringUtils.isBlank(latest)) { + return null; + } else { + return new TimeRange(earliest, latest); + } + } + + static class TimeRange { + + final String earliestTime; + final String latestTime; + + public TimeRange(String earliestTime, String latestTime) { + this.earliestTime = earliestTime; + this.latestTime = latestTime; + } + + public String getEarliestTime() { + return earliestTime; + } + + public String getLatestTime() { + return latestTime; + } + + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java new file mode 100644 index 0000000..482b85d --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; +import org.apache.nifi.util.LongHolder; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "tcp", "udp"}) +@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed +@CapabilityDescription("Sends logs to Splunk Enterprise over TCP, TCP + TLS/SSL, or UDP. If a Message " + + "Delimiter is provided, then this processor will read messages from the incoming FlowFile based on the " + + "delimiter, and send each message to Splunk. If a Message Delimiter is not provided then the content of " + + "the FlowFile will be sent directly to Splunk as if it were a single message.") +public class PutSplunk extends AbstractPutEventProcessor { + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + + "messages will be sent over a secure connection.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final char NEW_LINE_CHAR = '\n'; + + @Override + protected List<PropertyDescriptor> getAdditionalProperties() { + return Arrays.asList( + PROTOCOL, + MESSAGE_DELIMITER, + SSL_CONTEXT_SERVICE + ); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final Collection<ValidationResult> results = new ArrayList<>(); + + final String protocol = context.getProperty(PROTOCOL).getValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (UDP_VALUE.getValue().equals(protocol) && sslContextService != null) { + results.add(new ValidationResult.Builder() + .explanation("SSL can not be used with UDP") + .valid(false).subject("SSL Context").build()); + } + + return results; + } + + @OnStopped + public void cleanup() { + for (final FlowFileMessageBatch batch : activeBatches) { + batch.cancelOrComplete(); + } + + FlowFileMessageBatch batch; + while ((batch = completeBatches.poll()) != null) { + batch.completeSession(); + } + } + + @Override + protected String createTransitUri(ProcessContext context) { + final String port = context.getProperty(PORT).getValue(); + final String host = context.getProperty(HOSTNAME).getValue(); + final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase(); + return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); + } + + @Override + protected ChannelSender createSender(ProcessContext context) throws IOException { + final int port = context.getProperty(PORT).asInteger(); + final String host = context.getProperty(HOSTNAME).getValue(); + final String protocol = context.getProperty(PROTOCOL).getValue(); + final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + SSLContext sslContext = null; + if (sslContextService != null) { + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + } + + return createSender(protocol, host, port, timeout, maxSendBuffer, sslContext); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + // first complete any batches from previous executions + FlowFileMessageBatch batch; + while ((batch = completeBatches.poll()) != null) { + batch.completeSession(); + } + + // create a session and try to get a FlowFile, if none available then close any idle senders + final ProcessSession session = sessionFactory.createSession(); + final FlowFile flowFile = session.get(); + if (flowFile == null) { + pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + context.yield(); + return; + } + + // get a sender from the pool, or create a new one if the pool is empty + // if we can't create a new connection then route flow files to failure and yield + ChannelSender sender = senderPool.poll(); + if (sender == null) { + try { + getLogger().debug("No available connections, creating a new one..."); + sender = createSender(context); + } catch (IOException e) { + getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", + new Object[]{flowFile}, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + } + + try { + String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); + if (delimiter != null) { + delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + } + + // if no delimiter then treat the whole FlowFile as a single message + if (delimiter == null) { + processSingleMessage(context, session, flowFile, sender); + } else { + processDelimitedMessages(context, session, flowFile, sender, delimiter); + } + + } finally { + // if the connection is still open and no IO errors happened then try to return, if pool is full then close + if (sender.isConnected()) { + boolean returned = senderPool.offer(sender); + if (!returned) { + sender.close(); + } + } else { + // probably already closed here, but quietly close anyway to be safe + sender.close(); + } + + } + } + + /** + * Send the entire FlowFile as a single message. + */ + private void processSingleMessage(ProcessContext context, ProcessSession session, FlowFile flowFile, ChannelSender sender) { + // copy the contents of the FlowFile to the ByteArrayOutputStream + final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)flowFile.getSize() + 1); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.copy(in, baos); + } + }); + + // if TCP and we don't end in a new line then add one + final String protocol = context.getProperty(PROTOCOL).getValue(); + if (protocol.equals(TCP_VALUE.getValue())) { + final byte[] buf = baos.getUnderlyingBuffer(); + if (buf[baos.size() - 1] != NEW_LINE_CHAR) { + baos.write(NEW_LINE_CHAR); + } + } + + // create a message batch of one message and add to active batches + final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile); + messageBatch.setNumMessages(1); + activeBatches.add(messageBatch); + + // attempt to send the data and add the appropriate range + try { + sender.send(baos.toByteArray()); + messageBatch.addSuccessfulRange(0L, flowFile.getSize()); + } catch (IOException e) { + messageBatch.addFailedRange(0L, flowFile.getSize(), e); + context.yield(); + } + } + + /** + * Read delimited messages from the FlowFile tracking which messages are sent successfully. + */ + private void processDelimitedMessages(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, + final ChannelSender sender, final String delimiter) { + + final String protocol = context.getProperty(PROTOCOL).getValue(); + final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); + + // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see if it matches + // some pattern. We can use this to search for the delimiter as we read through the stream of bytes in the FlowFile + final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); + + final LongHolder messagesSent = new LongHolder(0L); + final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile); + activeBatches.add(messageBatch); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + byte[] data = null; // contents of a single message + boolean streamFinished = false; + + int nextByte; + try (final InputStream bufferedIn = new BufferedInputStream(rawIn); + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + + long messageStartOffset = in.getBytesConsumed(); + + // read until we're out of data. + while (!streamFinished) { + nextByte = in.read(); + + if (nextByte > -1) { + baos.write(nextByte); + } + + if (nextByte == -1) { + // we ran out of data. This message is complete. + data = getMessage(baos, baos.size(), protocol); + streamFinished = true; + } else if (buffer.addAndCompare((byte) nextByte)) { + // we matched our delimiter. This message is complete. We want all of the bytes from the + // underlying BAOS except for the last 'delimiterBytes.length' bytes because we don't want + // the delimiter itself to be sent. + data = getMessage(baos, baos.size() - delimiterBytes.length, protocol); + } + + if (data != null) { + final long messageEndOffset = in.getBytesConsumed(); + + // If the message has no data, ignore it. + if (data.length != 0) { + final long rangeStart = messageStartOffset; + try { + sender.send(data); + messageBatch.addSuccessfulRange(rangeStart, messageEndOffset); + messagesSent.incrementAndGet(); + } catch (final IOException e) { + messageBatch.addFailedRange(rangeStart, messageEndOffset, e); + } + } + + // reset BAOS so that we can start a new message. + baos.reset(); + data = null; + messageStartOffset = in.getBytesConsumed(); + } + } + } + } + }); + + messageBatch.setNumMessages(messagesSent.get()); + } + } + + /** + * Helper to get the bytes of a message from the ByteArrayOutputStream, factoring in whether we need a + * a new line at the end of our message. + * + * @param baos the ByteArrayOutputStream to get data from + * @param length the amount of data to copy from the baos + * @param protocol the protocol (TCP or UDP) + * + * @return the bytes from 0 to length, including a new line if the protocol was TCP + */ + private byte[] getMessage(final ByteArrayOutputStream baos, final int length, final String protocol) { + if (baos.size() == 0) { + return null; + } + + final byte[] buf = baos.getUnderlyingBuffer(); + + // if TCP and we don't already end with a new line then add one + if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != NEW_LINE_CHAR) { + byte[] message = new byte[length + 1]; + + for (int i=0; i < length; i++) { + message[i] = buf[i]; + } + message[message.length - 1] = NEW_LINE_CHAR; + return message; + } else { + return Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, length); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..4612039 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.splunk.GetSplunk +org.apache.nifi.processors.splunk.PutSplunk \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java new file mode 100644 index 0000000..42daab6 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestGetSplunk.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.JobExportArgs; +import com.splunk.Service; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestGetSplunk { + + private Service service; + private TestableGetSplunk proc; + private TestRunner runner; + + @Before + public void setup() { + service = Mockito.mock(Service.class); + proc = new TestableGetSplunk(service); + + runner = TestRunners.newTestRunner(proc); + } + + @Test + public void testCustomValidation() { + final String query = "search tcp:7879"; + final String providedEarliest = "-1h"; + final String providedLatest = "now"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest); + runner.setProperty(GetSplunk.LATEST_TIME, providedLatest); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + runner.assertValid(); + + runner.setProperty(GetSplunk.USERNAME, "user1"); + runner.assertNotValid(); + + runner.setProperty(GetSplunk.PASSWORD, "password"); + runner.assertValid(); + } + + @Test + public void testGetWithProvidedTime() { + final String query = "search tcp:7879"; + final String providedEarliest = "-1h"; + final String providedLatest = "now"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest); + runner.setProperty(GetSplunk.LATEST_TIME, providedLatest); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + + final JobExportArgs expectedArgs = new JobExportArgs(); + expectedArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL); + expectedArgs.setEarliestTime(providedEarliest); + expectedArgs.setLatestTime(providedLatest); + expectedArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode)); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), argThat(new JobExportArgsMatcher(expectedArgs)))).thenReturn(input); + + runner.run(); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1); + + final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetSplunk.REL_SUCCESS); + Assert.assertEquals(1, mockFlowFiles.size()); + + final MockFlowFile mockFlowFile = mockFlowFiles.get(0); + mockFlowFile.assertContentEquals(resultContent); + mockFlowFile.assertAttributeEquals(GetSplunk.QUERY_ATTR, query); + mockFlowFile.assertAttributeEquals(GetSplunk.EARLIEST_TIME_ATTR, providedEarliest); + mockFlowFile.assertAttributeEquals(GetSplunk.LATEST_TIME_ATTR, providedLatest); + Assert.assertEquals(1, proc.count); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ProvenanceEventType.RECEIVE, events.get(0).getEventType()); + Assert.assertEquals("https://localhost:8089", events.get(0).getTransitUri()); + } + + @Test + public void testMultipleIterationsWithoutShuttingDown() { + final String query = "search tcp:7879"; + final String providedEarliest = "-1h"; + final String providedLatest = "now"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.EARLIEST_TIME, providedEarliest); + runner.setProperty(GetSplunk.LATEST_TIME, providedLatest); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + + final JobExportArgs expectedArgs = new JobExportArgs(); + expectedArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL); + expectedArgs.setEarliestTime(providedEarliest); + expectedArgs.setLatestTime(providedLatest); + expectedArgs.setOutputMode(JobExportArgs.OutputMode.valueOf(outputMode)); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), argThat(new JobExportArgsMatcher(expectedArgs)))).thenReturn(input); + + final int iterations = 3; + runner.run(iterations, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, iterations); + Assert.assertEquals(1, proc.count); + } + + @Test + public void testGetWithManagedFromBeginning() { + final String query = "search tcp:7879"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_BEGINNING_VALUE.getValue()); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input); + + // run once and don't shut down + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1); + + // capture what the args were on last run + final ArgumentCaptor<JobExportArgs> capture1 = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(1)).export(eq(query), capture1.capture()); + + // first execution with no previous state and "managed from beginning" should have a latest time and no earliest time + final JobExportArgs actualArgs1 = capture1.getValue(); + Assert.assertNotNull(actualArgs1); + Assert.assertNull(actualArgs1.get("earliest_time")); + Assert.assertNotNull(actualArgs1.get("latest_time")); + + // save the latest time from the first run which should be earliest time of next run + final String expectedLatest = (String) actualArgs1.get("latest_time"); + + // run again + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 2); + + final ArgumentCaptor<JobExportArgs> capture2 = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(2)).export(eq(query), capture2.capture()); + + // second execution the earliest time should be the previous latest_time + final JobExportArgs actualArgs2 = capture2.getValue(); + Assert.assertNotNull(actualArgs2); + Assert.assertEquals(expectedLatest, actualArgs2.get("earliest_time")); + Assert.assertNotNull(actualArgs2.get("latest_time")); + } + + @Test + public void testGetWithManagedFromCurrent() throws IOException { + final String query = "search tcp:7879"; + final String outputMode = GetSplunk.ATOM_VALUE.getValue(); + + runner.setProperty(GetSplunk.QUERY, query); + runner.setProperty(GetSplunk.OUTPUT_MODE, outputMode); + runner.setProperty(GetSplunk.TIME_RANGE_STRATEGY, GetSplunk.MANAGED_CURRENT_VALUE.getValue()); + + final String resultContent = "fake results"; + final ByteArrayInputStream input = new ByteArrayInputStream(resultContent.getBytes(StandardCharsets.UTF_8)); + when(service.export(eq(query), any(JobExportArgs.class))).thenReturn(input); + + // run once and don't shut down, shouldn't produce any results first time + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 0); + + // capture what the args were on last run + verify(service, times(0)).export(eq(query), any(JobExportArgs.class)); + + final StateMap state = runner.getStateManager().getState(Scope.CLUSTER); + Assert.assertNotNull(state); + Assert.assertTrue(state.getVersion() > 0); + + // save the latest time from the first run which should be earliest time of next run + final String expectedLatest = state.get(GetSplunk.LATEST_TIME_KEY); + + // run again + runner.run(1, false); + runner.assertAllFlowFilesTransferred(GetSplunk.REL_SUCCESS, 1); + + final ArgumentCaptor<JobExportArgs> capture = ArgumentCaptor.forClass(JobExportArgs.class); + verify(service, times(1)).export(eq(query), capture.capture()); + + // second execution the earliest time should be the previous latest_time + final JobExportArgs actualArgs = capture.getValue(); + Assert.assertNotNull(actualArgs); + Assert.assertEquals(expectedLatest, actualArgs.get("earliest_time")); + Assert.assertNotNull(actualArgs.get("latest_time")); + } + + + /** + * Testable implementation of GetSplunk to return a Mock Splunk Service. + */ + private static class TestableGetSplunk extends GetSplunk { + + int count; + Service mockService; + + public TestableGetSplunk(Service mockService) { + this.mockService = mockService; + } + + @Override + protected Service createSplunkService(ProcessContext context) { + count++; + return mockService; + } + } + + /** + * Custom args matcher for JobExportArgs. + */ + private static class JobExportArgsMatcher extends ArgumentMatcher<JobExportArgs> { + + private JobExportArgs expected; + + public JobExportArgsMatcher(JobExportArgs expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + if (o == null) { + return false; + } + + if (!(o instanceof JobExportArgs)) { + return false; + } + + JobExportArgs other = (JobExportArgs) o; + return expected.equals(other); + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java new file mode 100644 index 0000000..bb55372 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class TestPutSplunk { + + private TestRunner runner; + private TestablePutSplunk proc; + private CapturingChannelSender sender; + + @Before + public void init() { + ProcessorLog logger = Mockito.mock(ProcessorLog.class); + sender = new CapturingChannelSender("localhost", 12345, 0, logger); + proc = new TestablePutSplunk(sender); + + runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutSplunk.PORT, "12345"); + } + + @Test + public void testUDPSendWholeFlowFile() { + final String message = "This is one message, should send the whole FlowFile"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(1, sender.getMessages().size()); + Assert.assertEquals(message, sender.getMessages().get(0)); + } + + @Test + public void testTCPSendWholeFlowFile() { + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + final String message = "This is one message, should send the whole FlowFile"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(1, sender.getMessages().size()); + Assert.assertEquals(message + "\n", sender.getMessages().get(0)); + } + + @Test + public void testTCPSendWholeFlowFileAlreadyHasNewLine() { + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + final String message = "This is one message, should send the whole FlowFile\n"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(1, sender.getMessages().size()); + Assert.assertEquals(message, sender.getMessages().get(0)); + } + + @Test + public void testUDPSendDelimitedMessages() { + final String delimiter = "DD"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); + + final String message = "This is message 1DDThis is message 2DDThis is message 3"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(3, sender.getMessages().size()); + Assert.assertEquals("This is message 1", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2", sender.getMessages().get(1)); + Assert.assertEquals("This is message 3", sender.getMessages().get(2)); + } + + @Test + public void testTCPSendDelimitedMessages() { + final String delimiter = "DD"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + // no delimiter at end + final String message = "This is message 1DDThis is message 2DDThis is message 3"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(3, sender.getMessages().size()); + Assert.assertEquals("This is message 1\n", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2\n", sender.getMessages().get(1)); + Assert.assertEquals("This is message 3\n", sender.getMessages().get(2)); + } + + @Test + public void testTCPSendDelimitedMessagesWithEL() { + final String delimiter = "DD"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, "${flow.file.delim}"); + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + // no delimiter at end + final String message = "This is message 1DDThis is message 2DDThis is message 3"; + + final Map<String,String> attrs = new HashMap<>(); + attrs.put("flow.file.delim", delimiter); + + runner.enqueue(message, attrs); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(3, sender.getMessages().size()); + Assert.assertEquals("This is message 1\n", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2\n", sender.getMessages().get(1)); + Assert.assertEquals("This is message 3\n", sender.getMessages().get(2)); + } + + @Test + public void testTCPSendDelimitedMessagesEndsWithDelimiter() { + final String delimiter = "DD"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + // delimiter at end + final String message = "This is message 1DDThis is message 2DDThis is message 3DD"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(3, sender.getMessages().size()); + Assert.assertEquals("This is message 1\n", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2\n", sender.getMessages().get(1)); + Assert.assertEquals("This is message 3\n", sender.getMessages().get(2)); + } + + @Test + public void testTCPSendDelimitedMessagesWithNewLineDelimiter() { + final String delimiter = "\\n"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + final String message = "This is message 1\nThis is message 2\nThis is message 3"; + + runner.enqueue(message); + runner.run(1); + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(3, sender.getMessages().size()); + Assert.assertEquals("This is message 1\n", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2\n", sender.getMessages().get(1)); + Assert.assertEquals("This is message 3\n", sender.getMessages().get(2)); + } + + @Test + public void testTCPSendDelimitedMessagesWithErrors() { + sender.setErrorStart(3); + sender.setErrorEnd(4); + + final String delimiter = "DD"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + // no delimiter at end + final String success = "This is message 1DDThis is message 2DD"; + final String failure = "This is message 3DDThis is message 4"; + final String message = success + failure; + + runner.enqueue(message); + runner.run(1); + runner.assertTransferCount(PutSplunk.REL_SUCCESS, 1); + runner.assertTransferCount(PutSplunk.REL_FAILURE, 1); + + // first two messages should went out success + final MockFlowFile successFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + successFlowFile.assertContentEquals(success); + + // second two messages should went to failure + final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0); + failureFlowFile.assertContentEquals(failure); + + // should only have the first two messages + Assert.assertEquals(2, sender.getMessages().size()); + Assert.assertEquals("This is message 1\n", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2\n", sender.getMessages().get(1)); + } + + @Test + public void testTCPSendDelimitedMessagesWithErrorsInMiddle() { + sender.setErrorStart(3); + sender.setErrorEnd(4); + + final String delimiter = "DD"; + runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter); + runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue()); + + // no delimiter at end + final String success = "This is message 1DDThis is message 2DD"; + final String failure = "This is message 3DDThis is message 4DD"; + final String success2 = "This is message 5DDThis is message 6DDThis is message 7DD"; + final String message = success + failure + success2; + + runner.enqueue(message); + runner.run(1); + runner.assertTransferCount(PutSplunk.REL_SUCCESS, 2); + runner.assertTransferCount(PutSplunk.REL_FAILURE, 1); + + // first two messages should have went out success + final MockFlowFile successFlowFile1 = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + successFlowFile1.assertContentEquals(success); + + // last three messages should have went out success + final MockFlowFile successFlowFile2 = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(1); + successFlowFile2.assertContentEquals(success2); + + // second two messages should have went to failure + final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0); + failureFlowFile.assertContentEquals(failure); + + // should only have the first two messages + Assert.assertEquals(5, sender.getMessages().size()); + Assert.assertEquals("This is message 1\n", sender.getMessages().get(0)); + Assert.assertEquals("This is message 2\n", sender.getMessages().get(1)); + Assert.assertEquals("This is message 5\n", sender.getMessages().get(2)); + Assert.assertEquals("This is message 6\n", sender.getMessages().get(3)); + Assert.assertEquals("This is message 7\n", sender.getMessages().get(4)); + } + + @Test + public void testCompletingPreviousBatchOnNextExecution() { + final String message = "This is one message, should send the whole FlowFile"; + + runner.enqueue(message); + runner.run(2, false); // don't shutdown to prove that next onTrigger complete previous batch + runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1); + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0); + mockFlowFile.assertContentEquals(message); + + Assert.assertEquals(1, sender.getMessages().size()); + Assert.assertEquals(message, sender.getMessages().get(0)); + } + + /** + * Extend PutSplunk to use a CapturingChannelSender. + */ + private static class TestablePutSplunk extends PutSplunk { + + private ChannelSender sender; + + public TestablePutSplunk(ChannelSender channelSender) { + this.sender = channelSender; + } + + @Override + protected ChannelSender createSender(String protocol, String host, int port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws IOException { + return sender; + } + } + + /** + * A ChannelSender that captures each message that was sent. + */ + private static class CapturingChannelSender extends ChannelSender { + + private List<String> messages = new ArrayList<>(); + private int count = 0; + private int errorStart = -1; + private int errorEnd = -1; + + public CapturingChannelSender(String host, int port, int maxSendBufferSize, ProcessorLog logger) { + super(host, port, maxSendBufferSize, logger); + } + + @Override + public void open() throws IOException { + + } + + @Override + protected void write(byte[] data) throws IOException { + count++; + if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count <= errorEnd) { + throw new IOException("this is an error"); + } + messages.add(new String(data, StandardCharsets.UTF_8)); + } + + @Override + public boolean isConnected() { + return false; + } + + @Override + public void close() { + + } + + public List<String> getMessages() { + return messages; + } + + public void setErrorStart(int errorStart) { + this.errorStart = errorStart; + } + + public void setErrorEnd(int errorEnd) { + this.errorEnd = errorEnd; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java new file mode 100644 index 0000000..309893b --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/util/LogGenerator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk.util; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +public class LogGenerator { + + static final String LOG_MESSAGE = "This is log message # %s"; + + private final int numLogs; + private final String delimiter; + + public LogGenerator(int numLogs, String delimiter) { + this.numLogs = numLogs; + this.delimiter = delimiter; + } + + public void generate(final File file) throws IOException { + try (OutputStream rawOut = new FileOutputStream(file); + BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + + for (int i = 0; i < numLogs; i++) { + if (i > 0) { + out.write(delimiter.getBytes(StandardCharsets.UTF_8)); + } + + final String message = String.format(LOG_MESSAGE, i); + out.write(message.getBytes(StandardCharsets.UTF_8)); + } + + System.out.println("Done generating " + numLogs + " messages"); + out.flush(); + } + } + + public static void main(String[] args) throws IOException { + if (args == null || args.length != 3) { + System.err.println("USAGE: LogGenerator <num_logs> <delimiter> <file>"); + System.exit(1); + } + + final int numLogs = Integer.parseInt(args[0]); + final String delim = args[1]; + + final File file = new File(args[2]); + if (file.exists()) { + file.delete(); + } + + final LogGenerator generator = new LogGenerator(numLogs, delim); + generator.generate(file); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/pom.xml b/nifi-nar-bundles/nifi-splunk-bundle/pom.xml new file mode 100644 index 0000000..daab02f --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/pom.xml @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-splunk-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-splunk-processors</module> + <module>nifi-splunk-nar</module> + </modules> + + <repositories> + <repository> + <id>splunk</id> + <name>Splunk Artifactory</name> + <url>http://splunk.artifactoryonline.com/splunk/ext-releases-local/</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.splunk</groupId> + <artifactId>splunk</artifactId> + <version>1.5.0.0</version> + </dependency> + </dependencies> + </dependencyManagement> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java index e51ba6c..a8a6bef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java @@ -49,6 +49,14 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor { .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Timeout") + .description("The timeout for connecting to and communicating with the syslog server. Does not apply to UDP") + .required(false) + .defaultValue("10 seconds") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + }
