http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
index 0000000,b7fe97a..d9317c4
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java
@@@ -1,0 -1,323 +1,323 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.regex.Pattern;
+ 
+ import javax.servlet.Servlet;
+ import javax.ws.rs.Path;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+ import org.apache.nifi.stream.io.StreamThrottler;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import 
org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet;
+ import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet;
+ import org.apache.nifi.ssl.SSLContextService;
+ 
+ import org.eclipse.jetty.server.Connector;
+ import org.eclipse.jetty.server.HttpConfiguration;
+ import org.eclipse.jetty.server.HttpConnectionFactory;
+ import org.eclipse.jetty.server.SecureRequestCustomizer;
+ import org.eclipse.jetty.server.Server;
+ import org.eclipse.jetty.server.ServerConnector;
+ import org.eclipse.jetty.server.SslConnectionFactory;
+ import org.eclipse.jetty.servlet.ServletContextHandler;
+ import org.eclipse.jetty.util.ssl.SslContextFactory;
+ import org.eclipse.jetty.util.thread.QueuedThreadPool;
+ 
+ @Tags({"ingest", "http", "https", "rest", "listen"})
+ @CapabilityDescription("Starts an HTTP Server that is used to receive 
FlowFiles from remote sources. The URL of the Service will be 
http://{hostname}:{port}/contentListener";)
+ public class ListenHTTP extends AbstractSessionFactoryProcessor {
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     public static final Relationship RELATIONSHIP_SUCCESS = new 
Relationship.Builder()
+             .name("success")
+             .description("Relationship for successfully received FlowFiles")
+             .build();
+ 
+     public static final PropertyDescriptor PORT = new 
PropertyDescriptor.Builder()
+             .name("Listening Port")
+             .description("The Port to listen on for incoming connections")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new 
PropertyDescriptor.Builder()
+             .name("Authorized DN Pattern")
+             .description("A Regular Expression to apply against the 
Distinguished Name of incoming connections. If the Pattern does not match the 
DN, the connection will be refused.")
+             .required(true)
+             .defaultValue(".*")
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new 
PropertyDescriptor.Builder()
+             .name("Max Unconfirmed Flowfile Time")
+             .description("The maximum amount of time to wait for a FlowFile 
to be confirmed before it is removed from the cache")
+             .required(true)
+             .defaultValue("60 secs")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MAX_DATA_RATE = new 
PropertyDescriptor.Builder()
+             .name("Max Data to Receive per Second")
+             .description("The maximum amount of data to receive per second; 
this allows the bandwidth to be throttled to a specified data rate; if not 
specified, the data rate is not throttled")
+             .required(false)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+             .name("SSL Context Service")
+             .description("The Controller Service to use in order to obtain an 
SSL Context")
+             .required(false)
+             .identifiesControllerService(SSLContextService.class)
+             .build();
+     public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new 
PropertyDescriptor.Builder()
+                   .name("HTTP Headers to receive as Attributes (Regex)")
+                   .description("Specifies the Regular Expression that 
determines the names of HTTP Headers that should be passed along as FlowFile 
attributes")
+                   
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+                   .required(false)
+                   .build();
+ 
+     public static final String URI = "/contentListener";
+     public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor";
+     public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger";
+     public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = 
"sessionFactoryHolder";
+     public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = 
"processContextHolder";
+     public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = 
"authorityPattern";
+     public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = 
"headerPattern";
+     public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap";
+     public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = 
"streamThrottler";
+ 
+     private volatile Server server = null;
+     private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap 
= new ConcurrentHashMap<String, FlowFileEntryTimeWrapper>();
+     private final AtomicReference<ProcessSessionFactory> 
sessionFactoryReference = new AtomicReference<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(RELATIONSHIP_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(PORT);
+         descriptors.add(MAX_DATA_RATE);
+         descriptors.add(SSL_CONTEXT_SERVICE);
+         descriptors.add(AUTHORIZED_DN_PATTERN);
+         descriptors.add(MAX_UNCONFIRMED_TIME);
+         descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX);
+         this.properties = Collections.unmodifiableList(descriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @OnStopped
+     public void shutdownHttpServer() {
+         final Server toShutdown = this.server;
+         if (toShutdown == null) {
+             return;
+         }
+ 
+         try {
+             toShutdown.stop();
+             toShutdown.destroy();
+         } catch (final Exception ex) {
+             getLogger().warn("unable to cleanly shutdown embedded server due 
to {}", new Object[]{ex});
+             this.server = null;
+         }
+     }
+ 
+     private void createHttpServerFromService(final ProcessContext context) 
throws Exception {
+         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+         final Double maxBytesPerSecond = 
context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
+         final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? 
null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue());
+ 
+         final boolean needClientAuth = sslContextService == null ? false : 
sslContextService.getTrustStoreFile() != null;
+ 
+         final SslContextFactory contextFactory = new SslContextFactory();
+         contextFactory.setNeedClientAuth(needClientAuth);
+ 
+         if (needClientAuth) {
+             
contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile());
+             
contextFactory.setTrustStoreType(sslContextService.getTrustStoreType());
+             
contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword());
+         }
+ 
+         final String keystorePath = sslContextService == null ? null : 
sslContextService.getKeyStoreFile();
+         if (keystorePath != null) {
+             final String keystorePassword = 
sslContextService.getKeyStorePassword();
+             final String keyStoreType = sslContextService.getKeyStoreType();
+ 
+             contextFactory.setKeyStorePath(keystorePath);
+             contextFactory.setKeyManagerPassword(keystorePassword);
+             contextFactory.setKeyStorePassword(keystorePassword);
+             contextFactory.setKeyStoreType(keyStoreType);
+         }
+ 
+         // thread pool for the jetty instance
+         final QueuedThreadPool threadPool = new QueuedThreadPool();
+         threadPool.setName(String.format("%s (%s) Web Server", 
getClass().getSimpleName(), getIdentifier()));
+ 
+         // create the server instance
+         final Server server = new Server(threadPool);
+ 
+         // get the configured port
+         final int port = context.getProperty(PORT).asInteger();
+ 
+         final ServerConnector connector;
+         final HttpConfiguration httpConfiguration = new HttpConfiguration();
+         if (keystorePath == null) {
+             // create the connector
+             connector = new ServerConnector(server, new 
HttpConnectionFactory(httpConfiguration));
+         } else {
+             // configure the ssl connector
+             httpConfiguration.setSecureScheme("https");
+             httpConfiguration.setSecurePort(port);
+             httpConfiguration.addCustomizer(new SecureRequestCustomizer());
+ 
+             // build the connector
+             connector = new ServerConnector(server,
+                     new SslConnectionFactory(contextFactory, "http/1.1"),
+                     new HttpConnectionFactory(httpConfiguration));
+         }
+ 
+         // configure the port
+         connector.setPort(port);
+ 
+         // add the connector to the server
+         server.setConnectors(new Connector[]{connector});
+ 
+         final ServletContextHandler contextHandler = new 
ServletContextHandler(server, "/", true, (keystorePath != null));
+         for (final Class<? extends Servlet> cls : getServerClasses()) {
+             final Path path = cls.getAnnotation(Path.class);
+             if (path == null) {
+                 contextHandler.addServlet(cls, "/*");
+             } else {
+                 contextHandler.addServlet(cls, path.value());
+             }
+         }
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger());
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, 
sessionFactoryReference);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, 
context);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, 
flowFileMap);
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, 
Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue()));
+         contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, 
streamThrottler);
+ 
+         if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) {
+               contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, 
Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue()));
+         }
+         server.start();
+ 
+         this.server = server;
+     }
+ 
+     @OnScheduled
+     public void createHttpServer(final ProcessContext context) throws 
Exception {
+         createHttpServerFromService(context);
+     }
+ 
+     protected Set<Class<? extends Servlet>> getServerClasses() {
+         final Set<Class<? extends Servlet>> s = new HashSet<>();
+         s.add(ListenHTTPServlet.class);
+         s.add(ContentAcknowledgmentServlet.class);
+         return s;
+     }
+ 
+     private Set<String> findOldFlowFileIds(final ProcessContext ctx) {
+         final Set<String> old = new HashSet<>();
+ 
+         final long expiryMillis = 
ctx.getProperty(MAX_UNCONFIRMED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long cutoffTime = System.currentTimeMillis() - expiryMillis;
+         for (final Map.Entry<String, FlowFileEntryTimeWrapper> entry : 
flowFileMap.entrySet()) {
+             final FlowFileEntryTimeWrapper wrapper = entry.getValue();
+             if (wrapper != null && wrapper.getEntryTime() < cutoffTime) {
+                 old.add(entry.getKey());
+             }
+         }
+ 
+         return old;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+         sessionFactoryReference.compareAndSet(null, sessionFactory);
+ 
+         for (final String id : findOldFlowFileIds(context)) {
+             final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id);
+             if (wrapper != null) {
+                 getLogger().warn("failed to received acknowledgment for HOLD 
with ID {}; rolling back session", new Object[]{id});
+                 wrapper.session.rollback();
+             }
+         }
+ 
+         context.yield();
+     }
+ 
+     public static class FlowFileEntryTimeWrapper {
+ 
+         private final Set<FlowFile> flowFiles;
+         private final long entryTime;
+         private final ProcessSession session;
+ 
+         public FlowFileEntryTimeWrapper(final ProcessSession session, final 
Set<FlowFile> flowFiles, final long entryTime) {
+             this.flowFiles = flowFiles;
+             this.entryTime = entryTime;
+             this.session = session;
+         }
+ 
+         public Set<FlowFile> getFlowFiles() {
+             return flowFiles;
+         }
+ 
+         public long getEntryTime() {
+             return entryTime;
+         }
+ 
+         public ProcessSession getSession() {
+             return session;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 0000000,43d8395..561e333
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@@ -1,0 -1,627 +1,627 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.net.NetworkInterface;
+ import java.net.SocketException;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Enumeration;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeExpression;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.io.nio.BufferPool;
+ import org.apache.nifi.io.nio.ChannelListener;
+ import org.apache.nifi.io.nio.consumer.StreamConsumer;
+ import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.OnUnscheduled;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.UDPStreamConsumer;
+ import org.apache.nifi.util.Tuple;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ /**
+  * <p>
+  * This processor listens for Datagram Packets on a given port and 
concatenates
+  * the contents of those packets together generating flow files roughly as 
often
+  * as the internal buffer fills up or until no more data is currently 
available.
+  * </p>
+  *
+  * <p>
+  * This processor has the following required properties:
+  * <ul>
+  * <li><b>Port</b> - The port to listen on for data packets. Must be known by
+  * senders of Datagrams.</li>
+  * <li><b>Receive Timeout</b> - The time out period when waiting to receive 
data
+  * from the socket. Specify units. Default is 5 secs.</li>
+  * <li><b>Max Buffer Size</b> - Determines the size each receive buffer may 
be.
+  * Specify units. Default is 1 MB.</li>
+  * <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size
+  * at which a flow file would be generated. A flow file will get made even if
+  * this value isn't reached if there is no more data streaming in and this 
value
+  * may be exceeded by the size of a single packet. Specify units. Default is 1
+  * MB.</li>
+  * <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should
+  * be used. This is a suggestion to the Operating System to indicate how big 
the
+  * udp socket buffer should be. Specify units. Default is 1 MB.")</li>
+  * <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to
+  * accept data from the socket. Higher numbers means more ram is allocated but
+  * can allow better throughput. Default is 4.</li>
+  * <li><b>Channel Reader Interval</b> - Scheduling interval for each read
+  * channel. Specify units. Default is 50 millisecs.</li>
+  * <li><b>FlowFiles Per Session</b> - The number of flow files per session.
+  * Higher number is more efficient, but will lose more data if a problem 
occurs
+  * that causes a rollback of a session. Default is 10</li>
+  * </ul>
+  * </p>
+  *
+  * This processor has the following optional properties:
+  * <ul>
+  * <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams 
from
+  * the specified Sending Host Port and this host will be accepted. Improves
+  * Performance. May be a system property or an environment variable.</li>
+  * <li><b>Sending Host Port</b> - Port being used by remote host to send
+  * Datagrams. Only Datagrams from the specified Sending Host and this port 
will
+  * be accepted. Improves Performance. May be a system property or an 
environment
+  * variable.</li>
+  * </ul>
+  * </p>
+  *
+  * <p>
+  * The following relationships are required:
+  * <ul>
+  * <li><b>success</b> - Where to route newly created flow files.</li>
+  * </ul>
+  * </p>
+  *
+  */
+ @TriggerWhenEmpty
+ @Tags({"ingest", "udp", "listen", "source"})
+ @CapabilityDescription("Listens for Datagram Packets on a given port and 
concatenates the contents of those packets "
+         + "together generating flow files")
+ public class ListenUDP extends AbstractSessionFactoryProcessor {
+ 
+     private static final Set<Relationship> relationships;
+     private static final List<PropertyDescriptor> properties;
+ 
+     // relationships.
+     public static final Relationship RELATIONSHIP_SUCCESS = new 
Relationship.Builder()
+             .name("success")
+             .description("Connection which contains concatenated Datagram 
Packets")
+             .build();
+ 
+     static {
+         Set<Relationship> rels = new HashSet<>();
+         rels.add(RELATIONSHIP_SUCCESS);
+         relationships = Collections.unmodifiableSet(rels);
+     }
+     // required properties.
+     public static final PropertyDescriptor PORT = new 
PropertyDescriptor.Builder()
+             .name("Port")
+             .description("Port to listen on. Must be known by senders of 
Datagrams.")
+             .addValidator(StandardValidators.PORT_VALIDATOR)
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor RECV_TIMEOUT = new 
PropertyDescriptor.Builder()
+             .name("Receive Timeout")
+             .description("The time out period when waiting to receive data 
from the socket. Specify units.")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("5 secs")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Max Buffer Size")
+             .description("Determines the size each receive buffer may be")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER = new 
PropertyDescriptor.Builder()
+             .name("FlowFile Size Trigger")
+             .description("Determines the (almost) upper bound size at which a 
flow file would be generated.")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor MAX_UDP_BUFFER = new 
PropertyDescriptor.Builder()
+             .name("Max size of UDP Buffer")
+             .description("The maximum UDP buffer size that should be used. 
This is a suggestion to the Operating System "
+                     + "to indicate how big the udp socket buffer should be.")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor RECV_BUFFER_COUNT = new 
PropertyDescriptor.Builder()
+             .name("Receive Buffer Count")
+             .description("Number of receiving buffers to be used to accept 
data from the socket. Higher numbers "
+                     + "means more ram is allocated but can allow better 
throughput.")
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("4")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor CHANNEL_READER_PERIOD = new 
PropertyDescriptor.Builder()
+             .name("Channel Reader Interval")
+             .description("Scheduling interval for each read channel.")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("50 ms")
+             .required(true)
+             .build();
+ 
+     public static final PropertyDescriptor FLOW_FILES_PER_SESSION = new 
PropertyDescriptor.Builder()
+             .name("FlowFiles Per Session")
+             .description("The number of flow files per session.")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .defaultValue("10")
+             .build();
+ 
+     // optional properties.
+     public static final PropertyDescriptor SENDING_HOST = new 
PropertyDescriptor.Builder()
+             .name("Sending Host")
+             .description("IP, or name, of a remote host. Only Datagrams from 
the specified Sending Host Port and this host will "
+                     + "be accepted. Improves Performance. May be a system 
property or an environment variable.")
+             .addValidator(new HostValidator())
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     public static final PropertyDescriptor SENDING_HOST_PORT = new 
PropertyDescriptor.Builder()
+             .name("Sending Host Port")
+             .description("Port being used by remote host to send Datagrams. 
Only Datagrams from the specified Sending Host and "
+                     + "this port will be accepted. Improves Performance. May 
be a system property or an environment variable.")
+             .addValidator(StandardValidators.PORT_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     private static final Set<String> interfaceSet = new HashSet<>();
+ 
+     static {
+         try {
+             final Enumeration<NetworkInterface> interfaceEnum
+                     = NetworkInterface.getNetworkInterfaces();
+             while (interfaceEnum.hasMoreElements()) {
+                 final NetworkInterface ifc = interfaceEnum.nextElement();
+                 interfaceSet.add(ifc.getName());
+             }
+         } catch (SocketException e) {
+         }
+     }
+     public static final PropertyDescriptor NETWORK_INTF_NAME = new 
PropertyDescriptor.Builder()
+             .name("Local Network Interface")
+             .description("The name of a local network interface to be used to 
restrict listening for UDP Datagrams to a specific LAN."
+                     + "May be a system property or an environment variable.")
+             .addValidator(new Validator() {
+                 @Override
+                 public ValidationResult validate(String subject, String 
input, ValidationContext context) {
+                     ValidationResult result = new ValidationResult.Builder()
+                     .subject("Local Network Interface")
+                     .valid(true)
+                     .input(input)
+                     .build();
+                     if (interfaceSet.contains(input.toLowerCase())) {
+                         return result;
+                     }
+ 
+                     String message;
+                     try {
+                         AttributeExpression ae = 
context.newExpressionLanguageCompiler().compile(input);
+                         String realValue = ae.evaluate();
+                         if (interfaceSet.contains(realValue.toLowerCase())) {
+                             return result;
+                         }
+ 
+                         message = realValue + " is not a valid network name. 
Valid names are " + interfaceSet.toString();
+ 
+                     } catch (IllegalArgumentException e) {
+                         message = "Not a valid AttributeExpression: " + 
e.getMessage();
+                     }
+                     result = new ValidationResult.Builder()
+                     .subject("Local Network Interface")
+                     .valid(false)
+                     .input(input)
+                     .explanation(message)
+                     .build();
+ 
+                     return result;
+                 }
+             })
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     static {
+         List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(SENDING_HOST);
+         props.add(SENDING_HOST_PORT);
+         props.add(NETWORK_INTF_NAME);
+         props.add(CHANNEL_READER_PERIOD);
+         props.add(FLOW_FILE_SIZE_TRIGGER);
+         props.add(MAX_BUFFER_SIZE);
+         props.add(MAX_UDP_BUFFER);
+         props.add(PORT);
+         props.add(RECV_BUFFER_COUNT);
+         props.add(FLOW_FILES_PER_SESSION);
+         props.add(RECV_TIMEOUT);
+         properties = Collections.unmodifiableList(props);
+     }
+     // defaults
+     public static final int DEFAULT_LISTENING_THREADS = 2;
+     // lock used to protect channelListener
+     private final Lock lock = new ReentrantLock();
+     private volatile ChannelListener channelListener = null;
+     private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> 
flowFilesPerSessionQueue = new LinkedBlockingQueue<>();
+     private final List<FlowFile> newFlowFiles = new ArrayList<>();
+     private final AtomicReference<UDPStreamConsumer> consumerRef = new 
AtomicReference<>();
+     private final AtomicBoolean stopping = new AtomicBoolean(false);
+     private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = 
new AtomicReference<>();
+     private final ExecutorService consumerExecutorService = 
Executors.newSingleThreadExecutor();
+     private final AtomicReference<Future<Tuple<ProcessSession, 
List<FlowFile>>>> consumerFutureRef = new AtomicReference<>();
+     private final AtomicBoolean resetChannelListener = new 
AtomicBoolean(false);
+     // instance attribute for provenance receive event generation
+     private volatile String sendingHost;
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     /**
+      * Create the ChannelListener and a thread that causes the Consumer to
+      * create flow files.
+      *
+      * @param context
+      * @throws IOException
+      */
+     @OnScheduled
+     public void initializeChannelListenerAndConsumerProcessing(final 
ProcessContext context) throws IOException {
+         getChannelListener(context);
+         stopping.set(false);
+         Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = 
consumerExecutorService
+                 .submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() 
{
+ 
+                     @Override
+                     public Tuple<ProcessSession, List<FlowFile>> call() {
+                         final int maxFlowFilesPerSession = 
context.getProperty(FLOW_FILES_PER_SESSION).asInteger();
+                         final long channelReaderIntervalMSecs = 
context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                         // number of waits in 5 secs, or 1
+                         final int maxWaits = (int) 
(channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1);
+                         final ProcessorLog logger = getLogger();
+                         int flowFileCount = maxFlowFilesPerSession;
+                         ProcessSession session = null;
+                         int numWaits = 0;
+                         while (!stopping.get()) {
+                             UDPStreamConsumer consumer = consumerRef.get();
+                             if (consumer == null || sessionFactoryRef.get() 
== null) {
+                                 try {
+                                     Thread.sleep(100L);
+                                 } catch (InterruptedException swallow) {
+                                 }
+                             } else {
+                                 try {
+                                     // first time through, flowFileCount is 
maxFlowFilesPerSession so that a session
+                                     // is created and the consumer is updated 
with it.
+                                     if (flowFileCount == 
maxFlowFilesPerSession || numWaits == maxWaits) {
+                                         logger.debug("Have waited {} times", 
new Object[]{numWaits});
+                                         numWaits = 0;
+                                         if (session != null) {
+                                             Tuple<ProcessSession, 
List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(
+                                                     session,
+                                                     new 
ArrayList<>(newFlowFiles));
+                                             newFlowFiles.clear();
+                                             
flowFilesPerSessionQueue.add(flowFilesPerSession);
+                                         }
+                                         session = 
sessionFactoryRef.get().createSession();
+                                         consumer.setSession(session);
+                                         flowFileCount = 0;
+                                     }
+                                     // this will throttle the processing of 
the received datagrams. If there are no more
+                                     // buffers to read into because none have 
been returned to the pool via consumer.process(),
+                                     // then the desired back pressure on the 
channel is created.
+                                     if 
(context.getAvailableRelationships().size() > 0) {
+                                         consumer.process();
+                                         if (flowFileCount == 
newFlowFiles.size()) {
+                                             // no new datagrams received, 
need to throttle this thread back so it does
+                                             // not consume all cpu...but 
don't want to cause back pressure on the channel
+                                             // so the sleep time is same as 
the reader interval
+                                             // If have done this for approx. 
5 secs, assume datagram sender is down. So, push
+                                             // out the remaining flow files 
(see numWaits == maxWaits above)
+                                             
Thread.sleep(channelReaderIntervalMSecs);
+                                             if (flowFileCount > 0) {
+                                                 numWaits++;
+                                             }
+                                         } else {
+                                             flowFileCount = 
newFlowFiles.size();
+                                         }
+                                     } else {
+                                         logger.debug("Creating back 
pressure...no available destinations");
+                                         Thread.sleep(1000L);
+                                     }
+                                 } catch (final IOException ioe) {
+                                     logger.error("Unable to fully process 
consumer {}", new Object[]{consumer}, ioe);
+                                 } catch (InterruptedException e) {
+                                     // don't care
+                                 } finally {
+                                     if (consumer.isConsumerFinished()) {
+                                         logger.info("Consumer {} was closed 
and is finished", new Object[]{consumer});
+                                         consumerRef.set(null);
+                                         disconnect();
+                                         if (!stopping.get()) {
+                                             resetChannelListener.set(true);
+                                         }
+                                     }
+                                 }
+                             }
+                         }
+                         // when shutting down, need consumer to drain rest of 
cached buffers and clean up.
+                         // prior to getting here, the channelListener was 
shutdown
+                         UDPStreamConsumer consumer;
+                         while ((consumer = consumerRef.get()) != null && 
!consumer.isConsumerFinished()) {
+                             try {
+                                 consumer.process();
+                             } catch (IOException swallow) {
+                                 // if this is 
blown...consumer.isConsumerFinished will be true
+                             }
+                         }
+                         Tuple<ProcessSession, List<FlowFile>> 
flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session,
+                                 new ArrayList<>(newFlowFiles));
+                         return flowFilesPerSession;
+                     }
+                 });
+         consumerFutureRef.set(consumerFuture);
+     }
+ 
+     private void disconnect() {
+         if (lock.tryLock()) {
+             try {
+                 if (channelListener != null) {
+                     getLogger().debug("Shutting down channel listener {}", 
new Object[]{channelListener});
+                     channelListener.shutdown(500L, TimeUnit.MILLISECONDS);
+                     channelListener = null;
+                 }
+             } finally {
+                 lock.unlock();
+             }
+         }
+     }
+ 
+     private void getChannelListener(final ProcessContext context) throws 
IOException {
+         if (lock.tryLock()) {
+             try {
+                 ProcessorLog logger = getLogger();
+                 logger.debug("Instantiating a new channel listener");
+                 final int port = context.getProperty(PORT).asInteger();
+                 final int bufferCount = 
context.getProperty(RECV_BUFFER_COUNT).asInteger();
+                 final Double bufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B);
+                 final Double rcvBufferSize = 
context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B);
+                 sendingHost = 
context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
+                 final Integer sendingHostPort = 
context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
+                 final String nicIPAddressStr = 
context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+                 final Double flowFileSizeTrigger = 
context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
+                 final int recvTimeoutMS = 
context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+                 final StreamConsumerFactory consumerFactory = new 
StreamConsumerFactory() {
+ 
+                     @Override
+                     public StreamConsumer newInstance(final String streamId) {
+                         final UDPStreamConsumer consumer = new 
UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), 
getLogger());
+                         consumerRef.set(consumer);
+                         return consumer;
+                     }
+                 };
+                 final int readerMilliseconds = 
context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+                 final BufferPool bufferPool = new BufferPool(bufferCount, 
bufferSize.intValue(), false, Integer.MAX_VALUE);
+                 channelListener = new 
ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, 
recvTimeoutMS, TimeUnit.MILLISECONDS);
+                 // specifying a sufficiently low number for each stream to be 
fast enough though very efficient
+                 
channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, 
TimeUnit.MILLISECONDS);
+                 InetAddress nicIPAddress = null;
+                 if (null != nicIPAddressStr) {
+                     NetworkInterface netIF = 
NetworkInterface.getByName(nicIPAddressStr);
+                     nicIPAddress = netIF.getInetAddresses().nextElement();
+                 }
+                 channelListener.addDatagramChannel(nicIPAddress, port, 
rcvBufferSize.intValue(), sendingHost, sendingHostPort);
+                 logger.info("Registered service and initialized UDP socket 
listener. Now listening on port " + port + "...");
+             } finally {
+                 lock.unlock();
+             }
+         }
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+         Collection<ValidationResult> result = new ArrayList<>();
+         String sendingHost = 
validationContext.getProperty(SENDING_HOST).getValue();
+         String sendingPort = 
validationContext.getProperty(SENDING_HOST_PORT).getValue();
+         if (StringUtils.isBlank(sendingHost) && 
StringUtils.isNotBlank(sendingPort)) {
+             result.add(new ValidationResult.Builder()
+                     .subject(SENDING_HOST.getName())
+                     .valid(false)
+                     .explanation("Must specify Sending Host when specifying 
Sending Host Port")
+                     .build());
+         } else if (StringUtils.isBlank(sendingPort) && 
StringUtils.isNotBlank(sendingHost)) {
+             result.add(new ValidationResult.Builder()
+                     .subject(SENDING_HOST_PORT.getName())
+                     .valid(false)
+                     .explanation("Must specify Sending Host Port when 
specifying Sending Host")
+                     .build());
+         }
+         return result;
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+         sessionFactoryRef.compareAndSet(null, sessionFactory);
+         if (resetChannelListener.getAndSet(false) && !stopping.get()) {
+             try {
+                 getChannelListener(context);
+             } catch (IOException e) {
+                 logger.error("Tried to reset Channel Listener and failed due 
to:", e);
+                 resetChannelListener.set(true);
+             }
+         }
+ 
+         transferFlowFiles();
+     }
+ 
+     private boolean transferFlowFiles() {
+         final ProcessorLog logger = getLogger();
+         ProcessSession session;
+         Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = null;
+         boolean transferred = false;
+         try {
+             flowFilesPerSession = flowFilesPerSessionQueue.poll(100L, 
TimeUnit.MILLISECONDS);
+         } catch (InterruptedException e) {
+         }
+         if (flowFilesPerSession != null) {
+             session = flowFilesPerSession.getKey();
+             List<FlowFile> flowFiles = flowFilesPerSession.getValue();
+             String sourceSystem = sendingHost == null ? "Unknown" : 
sendingHost;
+             try {
+                 for (FlowFile flowFile : flowFiles) {
+                     session.getProvenanceReporter().receive(flowFile, 
sourceSystem);
+                     session.transfer(flowFile, RELATIONSHIP_SUCCESS);
+                 }
+                 logger.info("Transferred flow files {} to success", new 
Object[]{flowFiles});
+                 transferred = true;
+ 
+                 // need to check for erroneous flow files in input queue
+                 List<FlowFile> existingFlowFiles = session.get(10);
+                 for (FlowFile existingFlowFile : existingFlowFiles) {
+                     if (existingFlowFile != null && 
existingFlowFile.getSize() > 0) {
+                         session.transfer(existingFlowFile, 
RELATIONSHIP_SUCCESS);
+                         logger.warn("Found flow file in input queue 
(shouldn't have). Transferred flow file {} to success",
+                                 new Object[]{existingFlowFile});
+                     } else if (existingFlowFile != null) {
+                         session.remove(existingFlowFile);
+                         logger.warn("Found empty flow file in input queue 
(shouldn't have). Removed flow file {}", new Object[]{existingFlowFile});
+                     }
+                 }
+                 session.commit();
+             } catch (Throwable t) {
+                 session.rollback();
+                 logger.error("Failed to transfer flow files or commit 
session...rolled back", t);
+                 throw t;
+             }
+         }
+         return transferred;
+     }
+ 
+     @OnUnscheduled
+     public void stopping() {
+         getLogger().debug("Stopping Processor");
+         disconnect();
+         stopping.set(true);
+         Future<Tuple<ProcessSession, List<FlowFile>>> future;
+         Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession;
+         if ((future = consumerFutureRef.getAndSet(null)) != null) {
+             try {
+                 flowFilesPerSession = future.get();
+                 if (flowFilesPerSession.getValue().size() > 0) {
+                     getLogger().debug("Draining remaining flow Files when 
stopping");
+                     flowFilesPerSessionQueue.add(flowFilesPerSession);
+                 } else {
+                     // need to close out the session that has no flow files
+                     flowFilesPerSession.getKey().commit();
+                 }
+             } catch (InterruptedException | ExecutionException e) {
+                 getLogger().error("Failure in cleaning up!", e);
+             }
+             boolean moreFiles = true;
+             while (moreFiles) {
+                 try {
+                     moreFiles = transferFlowFiles();
+                 } catch (Throwable t) {
+                     getLogger().error("Problem transferring cached 
flowfiles", t);
+                 }
+             }
+         }
+     }
+ 
+     @OnStopped
+     public void stopped() {
+         sessionFactoryRef.set(null);
+     }
+ 
+     public static class HostValidator implements Validator {
+ 
+         @Override
+         public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+             try {
+                 InetAddress.getByName(input);
+                 return new ValidationResult.Builder()
+                         .subject(subject)
+                         .valid(true)
+                         .input(input)
+                         .build();
+             } catch (final UnknownHostException e) {
+                 return new ValidationResult.Builder()
+                         .subject(subject)
+                         .valid(false)
+                         .input(input)
+                         .explanation("Unknown host: " + e)
+                         .build();
+             }
+         }
+ 
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
index 0000000,9708c49..daf513b
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogAttribute.java
@@@ -1,0 -1,262 +1,262 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.TreeSet;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"attributes", "logging"})
+ public class LogAttribute extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor LOG_LEVEL = new 
PropertyDescriptor.Builder()
+             .name("Log Level")
+             .required(true)
+             .description("The Log Level to use when logging the Attributes")
+             .allowableValues(DebugLevels.values())
+             .defaultValue("info")
+             .build();
+     public static final PropertyDescriptor ATTRIBUTES_TO_LOG_CSV = new 
PropertyDescriptor.Builder()
+             .name("Attributes to Log")
+             .required(false)
+             .description("A comma-separated list of Attributes to Log. If not 
specified, all attributes will be logged.")
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor ATTRIBUTES_TO_IGNORE_CSV = new 
PropertyDescriptor.Builder()
+             .name("Attributes to Ignore")
+             .description("A comma-separated list of Attributes to ignore. If 
not specified, no attributes will be ignored.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor LOG_PAYLOAD = new 
PropertyDescriptor.Builder()
+             .name("Log Payload")
+             .required(true)
+             .description("If true, the FlowFile's payload will be logged, in 
addition to its attributes; otherwise, just the Attributes will be logged.")
+             .defaultValue("false")
+             .allowableValues("true", "false")
+             .build();
+ 
+     public static final String FIFTY_DASHES = 
"--------------------------------------------------";
+ 
+     public static enum DebugLevels {
+ 
+         trace, debug, info, warn, error
+     }
+ 
+     public static final long ONE_MB = 1024 * 1024;
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> supportedDescriptors;
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All FlowFiles are routed to 
this relationship").build();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> procRels = new HashSet<>();
+         procRels.add(REL_SUCCESS);
+         relationships = Collections.unmodifiableSet(procRels);
+ 
+         // descriptors
+         final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
+         supDescriptors.add(LOG_LEVEL);
+         supDescriptors.add(LOG_PAYLOAD);
+         supDescriptors.add(ATTRIBUTES_TO_LOG_CSV);
+         supDescriptors.add(ATTRIBUTES_TO_IGNORE_CSV);
+         supportedDescriptors = Collections.unmodifiableList(supDescriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return supportedDescriptors;
+     }
+ 
+     protected String processFlowFile(final ProcessorLog logger, final 
DebugLevels logLevel, final FlowFile flowFile, final ProcessSession session, 
final ProcessContext context) {
+         final Set<String> attributeKeys = 
getAttributesToLog(flowFile.getAttributes().keySet(), context);
+         final ProcessorLog LOG = getLogger();
+ 
+         // Pretty print metadata
+         final StringBuilder message = new StringBuilder();
+         message.append("logging for flow file ").append(flowFile);
+         message.append("\n");
+         message.append(FIFTY_DASHES);
+         message.append("\nStandard FlowFile Attributes");
+         message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", 
"entryDate", new Date(flowFile.getEntryDate())));
+         message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", 
"lineageStartDate", new Date(flowFile.getLineageStartDate())));
+         message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", 
"fileSize", flowFile.getSize()));
+         message.append("\nFlowFile Attribute Map Content");
+         for (final String key : attributeKeys) {
+             message.append(String.format("\nKey: '%1$s'\n\tValue: '%2$s'", 
key, flowFile.getAttribute(key)));
+         }
+         message.append("\n");
+         message.append(FIFTY_DASHES);
+ 
+         // The user can request to log the payload
+         final boolean logPayload = 
context.getProperty(LOG_PAYLOAD).asBoolean();
+         if (logPayload) {
+             message.append("\n");
+             if (flowFile.getSize() < ONE_MB) {
+                 final FlowFilePayloadCallback callback = new 
FlowFilePayloadCallback();
+                 session.read(flowFile, callback);
+                 message.append(callback.getContents());
+             } else {
+                 message.append("\n Not including payload since it is larger 
than one mb.");
+             }
+         }
+         final String outputMessage = message.toString().trim();
+         // Uses optional property to specify logging level
+         switch (logLevel) {
+             case info:
+                 LOG.info(outputMessage);
+                 break;
+             case debug:
+                 LOG.debug(outputMessage);
+                 break;
+             case warn:
+                 LOG.warn(outputMessage);
+                 break;
+             case trace:
+                 LOG.trace(outputMessage);
+                 break;
+             case error:
+                 LOG.error(outputMessage);
+                 break;
+             default:
+                 LOG.debug(outputMessage);
+         }
+ 
+         return outputMessage;
+ 
+     }
+ 
+     private Set<String> getAttributesToLog(final Set<String> 
flowFileAttrKeys, final ProcessContext context) {
+         final Set<String> result = new TreeSet<>();
+ 
+         final String attrsToLogValue = 
context.getProperty(ATTRIBUTES_TO_LOG_CSV).getValue();
+         if (StringUtils.isBlank(attrsToLogValue)) {
+             result.addAll(flowFileAttrKeys);
+         } else {
+             result.addAll(Arrays.asList(attrsToLogValue.split("\\s*,\\s*")));
+         }
+ 
+         final String attrsToRemoveValue = 
context.getProperty(ATTRIBUTES_TO_IGNORE_CSV).getValue();
+         if (StringUtils.isNotBlank(attrsToRemoveValue)) {
+             
result.removeAll(Arrays.asList(attrsToRemoveValue.split("\\s*,\\s*")));
+         }
+ 
+         return result;
+     }
+ 
+     private void transferChunk(final ProcessSession session) {
+         final List<FlowFile> flowFiles = session.get(50);
+         if (!flowFiles.isEmpty()) {
+             session.transfer(flowFiles, REL_SUCCESS);
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final String logLevelValue = 
context.getProperty(LOG_LEVEL).getValue().toLowerCase();
+ 
+         final DebugLevels logLevel;
+         try {
+             logLevel = DebugLevels.valueOf(logLevelValue);
+         } catch (Exception e) {
+             throw new ProcessException(e);
+         }
+ 
+         final ProcessorLog LOG = getLogger();
+         boolean isLogLevelEnabled = false;
+         switch (logLevel) {
+             case trace:
+                 isLogLevelEnabled = LOG.isTraceEnabled();
+                 break;
+             case debug:
+                 isLogLevelEnabled = LOG.isDebugEnabled();
+                 break;
+             case info:
+                 isLogLevelEnabled = LOG.isInfoEnabled();
+                 break;
+             case warn:
+                 isLogLevelEnabled = LOG.isWarnEnabled();
+                 break;
+             case error:
+                 isLogLevelEnabled = LOG.isErrorEnabled();
+                 break;
+         }
+ 
+         if (!isLogLevelEnabled) {
+             transferChunk(session);
+             return;
+         }
+ 
+         final FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         processFlowFile(LOG, logLevel, flowFile, session, context);
+         session.transfer(flowFile, REL_SUCCESS);
+     }
+ 
+     protected static class FlowFilePayloadCallback implements 
InputStreamCallback {
+ 
+         private String contents = "";
+ 
+         @Override
+         public void process(final InputStream in) throws IOException {
+             contents = IOUtils.toString(in);
+         }
+ 
+         public String getContents() {
+             return contents;
+         }
+     }
+ 
+ }

Reply via email to