This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 64fa7c52ab NIFI-14815 Renamed Max Connections to Worker Threads in
ListenSyslog
64fa7c52ab is described below
commit 64fa7c52abbd8d193a3028704b91f4b4920b6265
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Aug 1 18:54:33 2025 -0500
NIFI-14815 Renamed Max Connections to Worker Threads in ListenSyslog
- Updated description to reflect Worker Threads behavior
- Removed unnecessary display name field from Property Descriptors
Signed-off-by: Pierre Villard <[email protected]>
This closes #10165.
---
.../nifi/processors/standard/ListenSyslog.java | 35 ++++++++++------------
1 file changed, 16 insertions(+), 19 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 0ded28eba8..4cb7d8cbf8 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -38,6 +38,7 @@ import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFac
import org.apache.nifi.event.transport.netty.FilteringStrategy;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -97,7 +98,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new
PropertyDescriptor.Builder()
.name("Max Size of Message Queue")
- .displayName("Max Size of Message Queue")
.description("The maximum size of the internal queue used to buffer
messages being transferred from the underlying channel to the processor. " +
"Setting this value higher allows more messages to be
buffered in memory during surges of incoming messages, but increases the total
" +
"memory used by the processor.")
@@ -108,7 +108,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor RECV_BUFFER_SIZE = new
PropertyDescriptor.Builder()
.name("Receive Buffer Size")
- .displayName("Receive Buffer Size")
.description("The size of each buffer used to receive Syslog messages.
Adjust this value appropriately based on the expected size of the " +
"incoming Syslog messages. When UDP is selected each
buffer will hold one Syslog message. When TCP is selected messages are read " +
"from an incoming connection until the buffer is full, or
the connection is closed. ")
@@ -118,7 +117,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new
PropertyDescriptor.Builder()
.name("Max Size of Socket Buffer")
- .displayName("Max Size of Socket Buffer")
.description("The maximum size of the socket buffer that should be
used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this
value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.")
@@ -127,10 +125,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.required(true)
.dependsOn(PROTOCOL, TCP_VALUE)
.build();
- public static final PropertyDescriptor MAX_CONNECTIONS = new
PropertyDescriptor.Builder()
- .name("Max Number of TCP Connections")
- .displayName("Max Number of TCP Connections")
- .description("The maximum number of concurrent connections to accept
Syslog messages in TCP mode.")
+ public static final PropertyDescriptor WORKER_THREADS = new
PropertyDescriptor.Builder()
+ .name("Worker Threads")
+ .description("Number of threads responsible for decoding and queuing
incoming syslog messages")
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
@@ -138,7 +135,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
.name("Max Batch Size")
- .displayName("Max Batch Size")
.description(
"The maximum number of Syslog events to add to a single
FlowFile. If multiple events are available, they will be concatenated along
with "
+ "the <Message Delimiter> up to this configured maximum
number of messages")
@@ -148,7 +144,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
.name("Message Delimiter")
- .displayName("Message Delimiter")
.description("Specifies the delimiter to place between Syslog messages
when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
@@ -156,7 +151,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor PARSE_MESSAGES = new
PropertyDescriptor.Builder()
.name("Parse Messages")
- .displayName("Parse Messages")
.description("Indicates if the processor should parse the Syslog
messages. If set to false, each outgoing FlowFile will only " +
"contain the sender, protocol, and port, and no additional
attributes.")
.allowableValues("true", "false")
@@ -165,7 +159,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
- .displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL
Context. If this property is set, syslog " +
"messages will be received over a secure connection.")
.required(false)
@@ -174,7 +167,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build();
public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("Client Auth")
- .displayName("Client Auth")
.description("The client authentication policy to use for the SSL
Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(ClientAuth.values())
@@ -182,8 +174,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.dependsOn(SSL_CONTEXT_SERVICE)
.build();
public static final PropertyDescriptor SOCKET_KEEP_ALIVE = new
PropertyDescriptor.Builder()
- .name("socket-keep-alive")
- .displayName("Socket Keep Alive")
+ .name("Socket Keep Alive")
.description("Whether or not to have TCP socket keep alive turned
on. Timing details depend on operating system properties.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
@@ -202,7 +193,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
RECV_BUFFER_SIZE,
MAX_MESSAGE_QUEUE_SIZE,
MAX_SOCKET_BUFFER_SIZE,
- MAX_CONNECTIONS,
+ WORKER_THREADS,
MAX_BATCH_SIZE,
MESSAGE_DELIMITER,
PARSE_MESSAGES,
@@ -232,6 +223,12 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private volatile BlockingQueue<ByteArrayMessage> syslogEvents = new
LinkedBlockingQueue<>();
private volatile byte[] messageDemarcatorBytes; //it is only the array
reference that is volatile - not the contents.
+ @Override
+ public void migrateProperties(final PropertyConfiguration
propertyConfiguration) {
+ propertyConfiguration.renameProperty("Max Number of TCP Connections",
WORKER_THREADS.getName());
+ propertyConfiguration.renameProperty("socket-keep-alive",
SOCKET_KEEP_ALIVE.getName());
+ }
+
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
@@ -277,17 +274,17 @@ public class ListenSyslog extends AbstractSyslogProcessor
{
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
final int maxSocketBufferSize;
- final int maxConnections;
+ final int workerThreads;
final SSLContextProvider sslContextProvider;
final Boolean socketKeepAlive;
if (transportProtocol == TransportProtocol.TCP) {
maxSocketBufferSize =
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- maxConnections =
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+ workerThreads =
context.getProperty(WORKER_THREADS).asLong().intValue();
sslContextProvider =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
socketKeepAlive =
context.getProperty(SOCKET_KEEP_ALIVE).asBoolean();
} else {
maxSocketBufferSize = 1_000_000;
- maxConnections = 2;
+ workerThreads = 2;
sslContextProvider = null;
socketKeepAlive = false;
}
@@ -297,7 +294,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
address, port, transportProtocol, messageDemarcatorBytes,
receiveBufferSize, syslogEvents, FilteringStrategy.EMPTY);
factory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
factory.setThreadNamePrefix(String.format("%s[%s]",
ListenSyslog.class.getSimpleName(), getIdentifier()));
- factory.setWorkerThreads(maxConnections);
+ factory.setWorkerThreads(workerThreads);
factory.setSocketReceiveBuffer(maxSocketBufferSize);
factory.setSocketKeepAlive(socketKeepAlive);