NIFI-1420 Adding Splunk bundle containing PutSplunk, and GetSplunk, and adding a ListenTCP processor to standard processors. Refactored internal code from PutSyslog to create a generic AbstractPutEventProcessor which PutSplunk extends from.
This closes #233 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6f5fb594 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6f5fb594 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6f5fb594 Branch: refs/heads/master Commit: 6f5fb5947911baf3da6892310d01148d8aa50f30 Parents: 4ce7b67 Author: Bryan Bende <[email protected]> Authored: Mon Mar 7 18:07:06 2016 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Mar 7 18:21:17 2016 -0500 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 5 + .../listen/AbstractListenEventProcessor.java | 4 + .../dispatcher/SocketChannelDispatcher.java | 31 +- .../handler/socket/SSLSocketChannelHandler.java | 19 +- .../socket/StandardSocketChannelHandler.java | 25 +- .../util/put/AbstractPutEventProcessor.java | 476 ++++++++++++++++ .../util/put/sender/ChannelSender.java | 103 ++++ .../util/put/sender/DatagramChannelSender.java | 80 +++ .../util/put/sender/SSLSocketChannelSender.java | 71 +++ .../util/put/sender/SocketChannelSender.java | 97 ++++ .../remote/io/socket/ssl/SSLSocketChannel.java | 24 +- .../nifi-splunk-bundle/nifi-splunk-nar/pom.xml | 42 ++ .../src/main/resources/META-INF/LICENSE | 203 +++++++ .../src/main/resources/META-INF/NOTICE | 24 + .../nifi-splunk-processors/pom.xml | 68 +++ .../nifi/processors/splunk/GetSplunk.java | 543 +++++++++++++++++++ .../nifi/processors/splunk/PutSplunk.java | 342 ++++++++++++ .../org.apache.nifi.processor.Processor | 16 + .../nifi/processors/splunk/TestGetSplunk.java | 283 ++++++++++ .../nifi/processors/splunk/TestPutSplunk.java | 370 +++++++++++++ .../processors/splunk/util/LogGenerator.java | 73 +++ nifi-nar-bundles/nifi-splunk-bundle/pom.xml | 59 ++ .../standard/AbstractSyslogProcessor.java | 8 + .../nifi/processors/standard/ListenTCP.java | 226 ++++++++ .../nifi/processors/standard/PutSyslog.java | 215 +------- .../org.apache.nifi.processor.Processor | 1 + .../standard/TestListenAndPutSyslog.java | 175 ++++++ .../processors/standard/TestListenRELP.java | 24 +- .../processors/standard/TestListenSyslog.java | 13 +- .../nifi/processors/standard/TestListenTCP.java | 275 ++++++++++ .../nifi/processors/standard/TestPutSyslog.java | 51 +- nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 33 files changed, 3711 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index a7dfb3f..6088e63 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -275,6 +275,11 @@ language governing permissions and limitations under the License. --> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-amqp-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-splunk-nar</artifactId> <type>nar</type> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java index d56255d..84fa5dc 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -220,6 +220,10 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst return errorEvents.size(); } + public int getQueueSize() { + return events == null ? 0 : events.size(); + } + @OnUnscheduled public void onUnscheduled() { if (dispatcher != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java index da5c414..670dba4 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -22,6 +22,7 @@ import org.apache.nifi.processor.util.listen.event.Event; import org.apache.nifi.processor.util.listen.event.EventFactory; import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.security.util.SslContextFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -56,6 +57,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements private final ProcessorLog logger; private final int maxConnections; private final SSLContext sslContext; + private final SslContextFactory.ClientAuth clientAuth; private final Charset charset; private ExecutorService executor; @@ -64,6 +66,16 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements private final BlockingQueue<SelectionKey> keyQueue; private final AtomicInteger currentConnections = new AtomicInteger(0); + public SocketChannelDispatcher(final EventFactory<E> eventFactory, + final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ProcessorLog logger, + final int maxConnections, + final SSLContext sslContext, + final Charset charset) { + this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset); + } public SocketChannelDispatcher(final EventFactory<E> eventFactory, final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, @@ -72,6 +84,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements final ProcessorLog logger, final int maxConnections, final SSLContext sslContext, + final SslContextFactory.ClientAuth clientAuth, final Charset charset) { this.eventFactory = eventFactory; this.handlerFactory = handlerFactory; @@ -81,6 +94,7 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements this.maxConnections = maxConnections; this.keyQueue = new LinkedBlockingQueue<>(maxConnections); this.sslContext = sslContext; + this.clientAuth = clientAuth; this.charset = charset; if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { @@ -152,7 +166,22 @@ public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements SSLSocketChannel sslSocketChannel = null; if (sslContext != null) { final SSLEngine sslEngine = sslContext.createSSLEngine(); - sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel, false); + sslEngine.setUseClientMode(false); + + switch (clientAuth) { + case REQUIRED: + sslEngine.setNeedClientAuth(true); + break; + case WANT: + sslEngine.setWantClientAuth(true); + break; + case NONE: + sslEngine.setNeedClientAuth(false); + sslEngine.setWantClientAuth(false); + break; + } + + sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel); } // Attach the buffer and SSLSocketChannel to the key http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java index 6a2c6f8..460ef08 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java @@ -129,17 +129,20 @@ public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends Soc // go through the buffer looking for the end of each message for (int i = 0; i < bytesRead; i++) { final byte currByte = buffer[i]; - currBytes.write(currByte); // check if at end of a message if (currByte == getDelimiter()) { - final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); - final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - - // queue the raw event blocking until space is available, reset the temporary buffer - final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.put(event); - currBytes.reset(); + if (currBytes.size() > 0) { + final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); + final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + + // queue the raw event blocking until space is available, reset the temporary buffer + final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); + events.put(event); + currBytes.reset(); + } + } else { + currBytes.write(currByte); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java index f12e705..e2fd3a8 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java @@ -131,20 +131,23 @@ public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extend // NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved // Pull data out of buffer and cram into byte array byte currByte = socketBuffer.get(); - currBytes.write(currByte); // check if at end of a message if (currByte == getDelimiter()) { - final SocketChannelResponder response = new SocketChannelResponder(socketChannel); - final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - - // queue the raw event blocking until space is available, reset the buffer - final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.put(event); - currBytes.reset(); - - // Mark this as the start of the next message - socketBuffer.mark(); + if (currBytes.size() > 0) { + final SocketChannelResponder response = new SocketChannelResponder(socketChannel); + final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + + // queue the raw event blocking until space is available, reset the buffer + final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); + events.put(event); + currBytes.reset(); + + // Mark this as the start of the next message + socketBuffer.mark(); + } + } else { + currBytes.write(currByte); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java new file mode 100644 index 0000000..c7313dc --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.put; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.processor.util.put.sender.DatagramChannelSender; +import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender; +import org.apache.nifi.processor.util.put.sender.SocketChannelSender; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A base class for processors that send data to an external system using TCP or UDP. + */ +public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryProcessor { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The ip address or hostname of the destination.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("localhost") + .required(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port on the destination.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Socket Send Buffer") + .description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " + + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + + "the data can be read, and incoming data will be dropped.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the data being sent.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Timeout") + .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP") + .required(false) + .defaultValue("10 seconds") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor + .Builder().name("Idle Connection Expiration") + .description("The amount of time a connection should be held open without being used before closing the connection.") + .required(true) + .defaultValue("5 seconds") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + // Putting these properties here so sub-classes don't have to redefine them, but they are + // not added to the properties by default since not all processors may need them + + public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP"); + public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP"); + + public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor + .Builder().name("Protocol") + .description("The protocol for communication.") + .required(true) + .allowableValues(TCP_VALUE, UDP_VALUE) + .defaultValue(UDP_VALUE.getValue()) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate message. Note that if messages are delimited and some messages for a given FlowFile " + + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those " + + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' " + + "relationship.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are sent successfully to the destination are sent out this relationship.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to send to the destination are sent out this relationship.") + .build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + protected volatile String transitUri; + protected volatile BlockingQueue<ChannelSender> senderPool; + + protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>(); + protected final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>()); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE); + descriptors.add(CHARSET); + descriptors.add(TIMEOUT); + descriptors.add(IDLE_EXPIRATION); + descriptors.addAll(getAdditionalProperties()); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.addAll(getAdditionalRelationships()); + this.relationships = Collections.unmodifiableSet(relationships); + } + + /** + * Override to provide additional relationships for the processor. + * + * @return a list of relationships + */ + protected List<Relationship> getAdditionalRelationships() { + return Collections.EMPTY_LIST; + } + + /** + * Override to provide additional properties for the processor. + * + * @return a list of properties + */ + protected List<PropertyDescriptor> getAdditionalProperties() { + return Collections.EMPTY_LIST; + } + + @Override + public final Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + // initialize the queue of senders, one per task, senders will get created on the fly in onTrigger + this.senderPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + this.transitUri = createTransitUri(context); + } + + @OnStopped + public void closeSenders() { + if (senderPool != null) { + ChannelSender sender = senderPool.poll(); + while (sender != null) { + sender.close(); + sender = senderPool.poll(); + } + } + } + + /** + * Sub-classes construct a transit uri for provenance events. Called from @OnScheduled + * method of this class. + * + * @param context the current context + * + * @return the transit uri + */ + protected abstract String createTransitUri(final ProcessContext context); + + /** + * Sub-classes create a ChannelSender given a context. + * + * @param context the current context + * @return an implementation of ChannelSender + * @throws IOException if an error occurs creating the ChannelSender + */ + protected abstract ChannelSender createSender(final ProcessContext context) throws IOException; + + /** + * Close any senders that haven't been active with in the given threshold + * + * @param idleThreshold the threshold to consider a sender as idle + */ + protected void pruneIdleSenders(final long idleThreshold) { + long currentTime = System.currentTimeMillis(); + final List<ChannelSender> putBack = new ArrayList<>(); + + // if a connection hasn't been used with in the threshold then it gets closed + ChannelSender sender; + while ((sender = senderPool.poll()) != null) { + if (currentTime > (sender.getLastUsed() + idleThreshold)) { + getLogger().debug("Closing idle connection..."); + sender.close(); + } else { + putBack.add(sender); + } + } + // re-queue senders that weren't idle, but if the queue is full then close the sender + for (ChannelSender putBackSender : putBack) { + boolean returned = senderPool.offer(putBackSender); + if (!returned) { + putBackSender.close(); + } + } + } + + /** + * Helper for sub-classes to create a sender. + * + * @param protocol the protocol for the sender + * @param host the host to send to + * @param port the port to send to + * @param timeout the timeout for connecting and communicating over the channel + * @param maxSendBufferSize the maximum size of the socket send buffer + * @param sslContext an SSLContext, or null if not using SSL + * + * @return a ChannelSender based on the given properties + * + * @throws IOException if an error occurs creating the sender + */ + protected ChannelSender createSender(final String protocol, + final String host, + final int port, + final int timeout, + final int maxSendBufferSize, + final SSLContext sslContext) throws IOException { + + ChannelSender sender; + if (protocol.equals(UDP_VALUE.getValue())) { + sender = new DatagramChannelSender(host, port, maxSendBufferSize, getLogger()); + } else { + // if an SSLContextService is provided then we make a secure sender + if (sslContext != null) { + sender = new SSLSocketChannelSender(host, port, maxSendBufferSize, sslContext, getLogger()); + } else { + sender = new SocketChannelSender(host, port, maxSendBufferSize, getLogger()); + } + } + + sender.setTimeout(timeout); + sender.open(); + return sender; + } + + /** + * Represents a range of messages from a FlowFile. + */ + protected static class Range { + private final long start; + private final long end; + + public Range(final long start, final long end) { + this.start = start; + this.end = end; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + @Override + public String toString() { + return "Range[" + start + "-" + end + "]"; + } + } + + /** + * A wrapper to hold the ranges of a FlowFile that were successful and ranges that failed, and then + * transfer those ranges appropriately. + */ + protected class FlowFileMessageBatch { + + private final ProcessSession session; + private final FlowFile flowFile; + private final long startTime = System.nanoTime(); + + private final List<Range> successfulRanges = new ArrayList<>(); + private final List<Range> failedRanges = new ArrayList<>(); + + private Exception lastFailureReason; + private long numMessages = -1L; + private long completeTime = 0L; + private boolean canceled = false; + + public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile) { + this.session = session; + this.flowFile = flowFile; + } + + public synchronized void cancelOrComplete() { + if (isComplete()) { + completeSession(); + return; + } + + this.canceled = true; + + session.rollback(); + successfulRanges.clear(); + failedRanges.clear(); + } + + public synchronized void addSuccessfulRange(final long start, final long end) { + if (canceled) { + return; + } + + successfulRanges.add(new Range(start, end)); + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + public synchronized void addFailedRange(final long start, final long end, final Exception e) { + if (canceled) { + return; + } + + failedRanges.add(new Range(start, end)); + lastFailureReason = e; + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + private boolean isComplete() { + return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages); + } + + public synchronized void setNumMessages(final long msgCount) { + this.numMessages = msgCount; + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + private void transferRanges(final List<Range> ranges, final Relationship relationship) { + Collections.sort(ranges, new Comparator<Range>() { + @Override + public int compare(final Range o1, final Range o2) { + return Long.compare(o1.getStart(), o2.getStart()); + } + }); + + for (int i = 0; i < ranges.size(); i++) { + Range range = ranges.get(i); + int count = 1; + + while (i + 1 < ranges.size()) { + // Check if the next range in the List continues where this one left off. + final Range nextRange = ranges.get(i + 1); + + if (nextRange.getStart() == range.getEnd()) { + // We have two ranges in a row that are contiguous; combine them into a single Range. + range = new Range(range.getStart(), nextRange.getEnd()); + + count++; + i++; + } else { + break; + } + } + + // Create a FlowFile for this range. + FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart()); + if (relationship == REL_SUCCESS) { + session.getProvenanceReporter().send(child, transitUri, "Sent " + count + " messages"); + session.transfer(child, relationship); + } else { + child = session.penalize(child); + session.transfer(child, relationship); + } + } + } + + public synchronized void completeSession() { + if (canceled) { + return; + } + + if (successfulRanges.isEmpty() && failedRanges.isEmpty()) { + getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + return; + } + + if (successfulRanges.isEmpty()) { + getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason}); + final FlowFile penalizedFlowFile = session.penalize(flowFile); + session.transfer(penalizedFlowFile, REL_FAILURE); + session.commit(); + return; + } + + if (failedRanges.isEmpty()) { + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime); + session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis}); + session.commit(); + return; + } + + // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way + // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success' + // and the failed messages to 'failure'. + transferRanges(successfulRanges, REL_SUCCESS); + transferRanges(failedRanges, REL_FAILURE); + session.remove(flowFile); + getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}", + new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason}); + session.commit(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java new file mode 100644 index 0000000..8c92b1f --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.put.sender; + +import org.apache.nifi.logging.ProcessorLog; + +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Base class for sending messages over a channel. + */ +public abstract class ChannelSender { + + protected final int port; + protected final String host; + protected final int maxSendBufferSize; + protected final ProcessorLog logger; + + protected volatile int timeout = 10000; + protected volatile long lastUsed; + + public ChannelSender(final String host, final int port, final int maxSendBufferSize, final ProcessorLog logger) { + this.port = port; + this.host = host; + this.maxSendBufferSize = maxSendBufferSize; + this.logger = logger; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public int getTimeout() { + return timeout; + } + + /** + * @return the last time data was sent over this channel + */ + public long getLastUsed() { + return lastUsed; + } + + /** + * Opens the connection to the destination. + * + * @throws IOException if an error occurred opening the connection. + */ + public abstract void open() throws IOException; + + /** + * Sends the given string over the channel. + * + * @param message the message to send over the channel + * @throws IOException if there was an error communicating over the channel + */ + public void send(final String message, final Charset charset) throws IOException { + final byte[] bytes = message.getBytes(charset); + send(bytes); + } + + /** + * Sends the given data over the channel. + * + * @param data the data to send over the channel + * @throws IOException if there was an error communicating over the channel + */ + public void send(final byte[] data) throws IOException { + write(data); + lastUsed = System.currentTimeMillis(); + } + + /** + * Write the given buffer to the underlying channel. + */ + protected abstract void write(byte[] data) throws IOException; + + /** + * @return true if the underlying channel is connected + */ + public abstract boolean isConnected(); + + /** + * Close the underlying channel + */ + public abstract void close(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java new file mode 100644 index 0000000..632c6e5 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.put.sender; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +/** + * Sends messages over a DatagramChannel. + */ +public class DatagramChannelSender extends ChannelSender { + + private DatagramChannel channel; + + public DatagramChannelSender(final String host, final int port, final int maxSendBufferSize, final ProcessorLog logger) { + super(host, port, maxSendBufferSize, logger); + } + + @Override + public void open() throws IOException { + if (channel == null) { + channel = DatagramChannel.open(); + + if (maxSendBufferSize > 0) { + channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize); + final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF); + if (actualSendBufSize < maxSendBufferSize) { + logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize + + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to " + + "consider changing the Operating System's maximum receive buffer"); + } + } + } + + if (!channel.isConnected()) { + channel.connect(new InetSocketAddress(InetAddress.getByName(host), port)); + } + } + + @Override + protected void write(byte[] data) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(data); + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } + + @Override + public boolean isConnected() { + return channel != null && channel.isConnected(); + } + + @Override + public void close() { + IOUtils.closeQuietly(channel); + channel = null; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java new file mode 100644 index 0000000..dc85d80 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.put.sender; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import javax.net.ssl.SSLContext; +import java.io.IOException; + +/** + * Sends messages over an SSLSocketChannel. + */ +public class SSLSocketChannelSender extends SocketChannelSender { + + private SSLSocketChannel sslChannel; + private SSLContext sslContext; + + public SSLSocketChannelSender(final String host, + final int port, + final int maxSendBufferSize, + final SSLContext sslContext, + final ProcessorLog logger) { + super(host, port, maxSendBufferSize, logger); + this.sslContext = sslContext; + } + + @Override + public void open() throws IOException { + if (sslChannel == null) { + super.open(); + sslChannel = new SSLSocketChannel(sslContext, channel, true); + } + sslChannel.setTimeout(timeout); + + // SSLSocketChannel will check if already connected so we can safely call this + sslChannel.connect(); + } + + @Override + protected void write(byte[] data) throws IOException { + sslChannel.write(data); + } + + @Override + public boolean isConnected() { + return sslChannel != null && !sslChannel.isClosed(); + } + + @Override + public void close() { + super.close(); + IOUtils.closeQuietly(sslChannel); + sslChannel = null; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java new file mode 100644 index 0000000..fcf341a --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.processor.util.put.sender; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.nio.channels.SocketChannel; + +/** + * Sends messages over a SocketChannel. + */ +public class SocketChannelSender extends ChannelSender { + + protected SocketChannel channel; + protected SocketChannelOutputStream socketChannelOutput; + + public SocketChannelSender(final String host, final int port, final int maxSendBufferSize, final ProcessorLog logger) { + super(host, port, maxSendBufferSize, logger); + } + + @Override + public void open() throws IOException { + if (channel == null) { + channel = SocketChannel.open(); + channel.configureBlocking(false); + + if (maxSendBufferSize > 0) { + channel.setOption(StandardSocketOptions.SO_SNDBUF, maxSendBufferSize); + final int actualSendBufSize = channel.getOption(StandardSocketOptions.SO_SNDBUF); + if (actualSendBufSize < maxSendBufferSize) { + logger.warn("Attempted to set Socket Send Buffer Size to " + maxSendBufferSize + + " bytes but could only set to " + actualSendBufSize + "bytes. You may want to " + + "consider changing the Operating System's maximum receive buffer"); + } + } + } + + if (!channel.isConnected()) { + final long startTime = System.currentTimeMillis(); + final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(host), port); + + if (!channel.connect(socketAddress)) { + while (!channel.finishConnect()) { + if (System.currentTimeMillis() > startTime + timeout) { + throw new SocketTimeoutException("Timed out connecting to " + host + ":" + port); + } + + try { + Thread.sleep(50L); + } catch (final InterruptedException e) { + } + } + } + + socketChannelOutput = new SocketChannelOutputStream(channel); + socketChannelOutput.setTimeout(timeout); + } + } + + @Override + protected void write(byte[] data) throws IOException { + socketChannelOutput.write(data); + } + + @Override + public boolean isConnected() { + return channel != null && channel.isConnected(); + } + + @Override + public void close() { + IOUtils.closeQuietly(socketChannelOutput); + IOUtils.closeQuietly(channel); + socketChannelOutput = null; + channel = null; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 408eb59..2209e38 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -85,10 +85,27 @@ public class SSLSocketChannel implements Closeable { } public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException { - this(sslContext.createSSLEngine(), socketChannel, client); + if (!socketChannel.isConnected()) { + throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel"); + } + + this.channel = socketChannel; + + this.socketAddress = socketChannel.getRemoteAddress(); + final Socket socket = socketChannel.socket(); + this.hostname = socket.getInetAddress().getHostName(); + this.port = socket.getPort(); + + this.engine = sslContext.createSSLEngine(); + this.engine.setUseClientMode(client); + this.engine.setNeedClientAuth(true); + + streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); + streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); + appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize())); } - public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel, final boolean client) throws IOException { + public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel) throws IOException { if (!socketChannel.isConnected()) { throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel"); } @@ -100,9 +117,8 @@ public class SSLSocketChannel implements Closeable { this.hostname = socket.getInetAddress().getHostName(); this.port = socket.getPort(); + // don't set useClientMode or needClientAuth, use the engine as is and let the caller configure it this.engine = sslEngine; - this.engine.setUseClientMode(client); - this.engine.setNeedClientAuth(true); streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml new file mode 100644 index 0000000..10bec1c --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-splunk-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-splunk-nar</artifactId> + <version>0.6.0-SNAPSHOT</version> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-splunk-processors</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..6b0b127 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..22c3bbe --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,24 @@ +nifi-splunk-nar +Copyright 2015-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2012 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml new file mode 100644 index 0000000..a1dd6a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-splunk-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-splunk-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.splunk</groupId> + <artifactId>splunk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> +</project>
