Repository: nifi
Updated Branches:
  refs/heads/master a9a9b6743 -> da6ad4f3b


NIFI-3670 - Expose the control of ListenSyslog's CLIENT_AUTH property to DFM

This closes #1720.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da6ad4f3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da6ad4f3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da6ad4f3

Branch: refs/heads/master
Commit: da6ad4f3bcdeb43783aafa9f8942c5fa2a7da20b
Parents: a9a9b67
Author: Andre F de Miranda <[email protected]>
Authored: Mon May 1 22:32:51 2017 +1000
Committer: Bryan Bende <[email protected]>
Committed: Mon May 1 10:41:12 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 113 ++++++++++++-------
 1 file changed, 72 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/da6ad4f3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 54d516f..ac874d5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -73,6 +73,7 @@ import 
org.apache.nifi.processor.util.listen.response.ChannelResponder;
 import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
 import org.apache.nifi.processors.standard.syslog.SyslogEvent;
 import org.apache.nifi.processors.standard.syslog.SyslogParser;
+import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.ssl.SSLContextService;
 
 @SupportsBatching
@@ -103,44 +104,49 @@ import org.apache.nifi.ssl.SSLContextService;
 public class ListenSyslog extends AbstractSyslogProcessor {
 
     public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Size of Message Queue")
-            .description("The maximum size of the internal queue used to 
buffer messages being transferred from the underlying channel to the processor. 
" +
+        .name("Max Size of Message Queue")
+        .displayName("Max Size of Message Queue")
+        .description("The maximum size of the internal queue used to buffer 
messages being transferred from the underlying channel to the processor. " +
                     "Setting this value higher allows more messages to be 
buffered in memory during surges of incoming messages, but increases the total 
" +
                     "memory used by the processor.")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("10000")
-            .required(true)
-            .build();
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("10000")
+        .required(true)
+        .build();
     public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Receive Buffer Size")
-            .description("The size of each buffer used to receive Syslog 
messages. Adjust this value appropriately based on the expected size of the " +
+        .name("Receive Buffer Size")
+        .displayName("Receive Buffer Size")
+        .description("The size of each buffer used to receive Syslog messages. 
Adjust this value appropriately based on the expected size of the " +
                     "incoming Syslog messages. When UDP is selected each 
buffer will hold one Syslog message. When TCP is selected messages are read " +
                     "from an incoming connection until the buffer is full, or 
the connection is closed. ")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("65507 B")
-            .required(true)
-            .build();
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("65507 B")
+        .required(true)
+        .build();
     public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Size of Socket Buffer")
-            .description("The maximum size of the socket buffer that should be 
used. This is a suggestion to the Operating System " +
+        .name("Max Size of Socket Buffer")
+        .displayName("Max Size of Socket Buffer")
+        .description("The maximum size of the socket buffer that should be 
used. This is a suggestion to the Operating System " +
                     "to indicate how big the socket buffer should be. If this 
value is set too low, the buffer may fill up before " +
                     "the data can be read, and incoming data will be dropped.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .required(true)
-            .build();
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .required(true)
+        .build();
     public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
-            .name("Max Number of TCP Connections")
-            .description("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
-            .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
-            .defaultValue("2")
-            .required(true)
-            .build();
+        .name("Max Number of TCP Connections")
+        .displayName("Max Number of TCP Connections")
+        .description("The maximum number of concurrent connections to accept 
Syslog messages in TCP mode.")
+        .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+        .defaultValue("2")
+        .required(true)
+        .build();
     public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
         .name("Max Batch Size")
+        .displayName("Max Batch Size")
         .description(
                 "The maximum number of Syslog events to add to a single 
FlowFile. If multiple events are available, they will be concatenated along 
with "
-                        + "the <Message Delimiter> up to this configured 
maximum number of messages")
+                + "the <Message Delimiter> up to this configured maximum 
number of messages")
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .expressionLanguageSupported(false)
         .defaultValue("1")
@@ -148,6 +154,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         .build();
     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
         .name("Message Delimiter")
+        .displayName("Message Delimiter")
         .description("Specifies the delimiter to place between Syslog messages 
when multiple messages are bundled together (see <Max Batch Size> property).")
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .defaultValue("\\n")
@@ -155,28 +162,38 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         .build();
     public static final PropertyDescriptor PARSE_MESSAGES = new 
PropertyDescriptor.Builder()
         .name("Parse Messages")
+        .displayName("Parse Messages")
         .description("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only " +
-            "contain the sender, protocol, and port, and no additional 
attributes.")
+                    "contain the sender, protocol, and port, and no additional 
attributes.")
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
         .build();
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("The Controller Service to use in order to obtain an 
SSL Context. If this property is set, syslog " +
+        .name("SSL Context Service")
+        .displayName("SSL Context Service")
+        .description("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, syslog " +
                     "messages will be received over a secure connection.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
+        .required(false)
+        .identifiesControllerService(SSLContextService.class)
+        .build();
+    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
+        .name("Client Auth")
+        .displayName("Client Auth")
+        .description("The client authentication policy to use for the SSL 
Context. Only used if an SSL Context Service is provided.")
+        .required(false)
+        .allowableValues(SSLContextService.ClientAuth.values())
+        .defaultValue(SSLContextService.ClientAuth.REQUIRED.name())
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Syslog messages that match one of the expected 
formats will be sent out this relationship as a FlowFile per message.")
-            .build();
+        .name("success")
+        .description("Syslog messages that match one of the expected formats 
will be sent out this relationship as a FlowFile per message.")
+        .build();
     public static final Relationship REL_INVALID = new Relationship.Builder()
-            .name("invalid")
-            .description("Syslog messages that do not match one of the 
expected formats will be sent out this relationship as a FlowFile per message.")
-            .build();
+        .name("invalid")
+        .description("Syslog messages that do not match one of the expected 
formats will be sent out this relationship as a FlowFile per message.")
+        .build();
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
@@ -195,6 +212,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         descriptors.add(PORT);
         descriptors.add(NETWORK_INTF_NAME);
         descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
         descriptors.add(RECV_BUFFER_SIZE);
         descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
@@ -252,6 +270,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     .valid(false).subject("SSL Context").build());
         }
 
+        // Validate CLIENT_AUTH
+        final String clientAuth = 
validationContext.getProperty(CLIENT_AUTH).getValue();
+        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
+            results.add(new ValidationResult.Builder()
+                    .explanation("Client Auth must be provided when using 
TLS/SSL")
+                    .valid(false).subject("Client Auth").build());
+        }
+
+
         return results;
     }
 
@@ -290,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         // create either a UDP or TCP reader and call open() to bind to the 
given port
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        channelDispatcher = createChannelReader(protocol, bufferPool, 
syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
+        channelDispatcher = createChannelReader(context, protocol, bufferPool, 
syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
         channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelDispatcher);
@@ -305,7 +332,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     }
 
     // visible for testing to be overridden and provide a mock 
ChannelDispatcher if desired
-    protected ChannelDispatcher createChannelReader(final String protocol, 
final BlockingQueue<ByteBuffer> bufferPool,
+    protected ChannelDispatcher createChannelReader(final ProcessContext 
context, final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
                                                     final 
BlockingQueue<RawSyslogEvent> events, final int maxConnections,
                                                     final SSLContextService 
sslContextService, final Charset charset) throws IOException {
 
@@ -316,12 +343,16 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         } else {
             // if an SSLContextService was provided then create an SSLContext 
to pass down to the dispatcher
             SSLContext sslContext = null;
+            SslContextFactory.ClientAuth clientAuth = null;
+
             if (sslContextService != null) {
-                sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+                final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+                sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue));
+                clientAuth = 
SslContextFactory.ClientAuth.valueOf(clientAuthValue);
             }
 
             final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, 
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
-            return new SocketChannelDispatcher(eventFactory, handlerFactory, 
bufferPool, events, getLogger(), maxConnections, sslContext, charset);
+            return new SocketChannelDispatcher(eventFactory, handlerFactory, 
bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, 
charset);
         }
     }
 

Reply via email to