This is an automated email from the ASF dual-hosted git repository.
kdoran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 74be3ccebe NIFI-15245 Updated ListenSyslog (#10556)
74be3ccebe is described below
commit 74be3ccebe5b1f28e8662e5daee320fba1f01c94
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Nov 24 13:56:13 2025 +0100
NIFI-15245 Updated ListenSyslog (#10556)
* Updated ListenSyslog to keep the original UX with Protocol and Port
properties
---
.../standard/AbstractSyslogProcessor.java | 14 +++
.../nifi/processors/standard/ListenSyslog.java | 140 ++++++++-------------
.../apache/nifi/processors/standard/PutSyslog.java | 18 +--
.../nifi/processors/standard/TestListenSyslog.java | 96 +++++++-------
4 files changed, 113 insertions(+), 155 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index fd4c6f5bfa..9768fbc0aa 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -30,6 +30,20 @@ public abstract class AbstractSyslogProcessor extends
AbstractProcessor {
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP",
"TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP",
"UDP");
+ public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
+ .Builder().name("Protocol")
+ .description("The protocol for Syslog communication.")
+ .required(true)
+ .allowableValues(TCP_VALUE, UDP_VALUE)
+ .defaultValue(UDP_VALUE.getValue())
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The port for Syslog communication. Note that
Expression language is not evaluated per FlowFile.")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the Syslog messages.
Note that Expression language is not evaluated per FlowFile.")
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 700c3eee9b..15e03ae7f4 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -40,7 +40,6 @@ import
org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.event.transport.netty.FilteringStrategy;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
@@ -101,26 +100,20 @@ import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_I
@SeeAlso({PutSyslog.class, ParseSyslog.class})
public class ListenSyslog extends AbstractSyslogProcessor implements
ListenComponent {
- // See migrateProperties() below for how legacy properties are handled on
upgrade
- private static final String LEGACY_PORT_PROPERTY_NAME = "Port";
- private static final String LEGACY_PROTOCOL_PROPERTY_NAME = "Protocol";
-
- public static final PropertyDescriptor TCP_PORT = new PropertyDescriptor
- .Builder().name("TCP Port")
- .description("The port to listen on for TCP Syslog communication.
Either this or UDP Port must be set, but both cannot be set at the same time.")
- .required(false)
- .addValidator(StandardValidators.PORT_VALIDATOR)
+ public static final PropertyDescriptor TCP_PORT = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(PORT)
+ .name("TCP Port")
+ .displayName("TCP Port")
.identifiesListenPort(org.apache.nifi.components.listen.TransportProtocol.TCP,
"syslog")
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .dependsOn(PROTOCOL, TCP_VALUE)
.build();
- public static final PropertyDescriptor UDP_PORT = new PropertyDescriptor
- .Builder().name("UDP Port")
- .description("The port to listen on for UDP Syslog communication.
Either this or TCP Port must be set, but both cannot be set at the same time.")
- .required(false)
- .addValidator(StandardValidators.PORT_VALIDATOR)
+ public static final PropertyDescriptor UDP_PORT = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(PORT)
+ .name("UDP Port")
+ .displayName("UDP Port")
.identifiesListenPort(org.apache.nifi.components.listen.TransportProtocol.UDP,
"syslog")
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .dependsOn(PROTOCOL, UDP_VALUE)
.build();
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new
PropertyDescriptor.Builder()
@@ -150,7 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
- .dependsOn(TCP_PORT)
+ .dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final PropertyDescriptor WORKER_THREADS = new
PropertyDescriptor.Builder()
.name("Worker Threads")
@@ -158,7 +151,7 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
- .dependsOn(TCP_PORT)
+ .dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
.name("Max Batch Size")
@@ -190,7 +183,7 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
"messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextProvider.class)
- .dependsOn(TCP_PORT)
+ .dependsOn(PROTOCOL, TCP_VALUE)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("Client Auth")
@@ -207,10 +200,11 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.defaultValue(Boolean.FALSE.toString())
- .dependsOn(TCP_PORT)
+ .dependsOn(PROTOCOL, TCP_VALUE)
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ PROTOCOL,
TCP_PORT,
UDP_PORT,
NETWORK_INTF_NAME,
@@ -255,32 +249,27 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
propertyConfiguration.renameProperty("Max Number of TCP Connections",
WORKER_THREADS.getName());
propertyConfiguration.renameProperty("socket-keep-alive",
SOCKET_KEEP_ALIVE.getName());
- // Older version of ListenSyslog had a single "Port" property and a
"Protocol" property to specify TCP or UDP.
+ // Older version of ListenSyslog had a single "Port" property.
// Starting with 2.7.0, ListenSyslog now uses two mutually exclusive
"TCP Port" and "UDP Port" properties in order to support the
ListenPortDefinition feature of Property Descriptors.
- if (propertyConfiguration.isPropertySet(LEGACY_PORT_PROPERTY_NAME)) {
- final String legacyProtocolRawValue =
propertyConfiguration.getPropertyValue(LEGACY_PROTOCOL_PROPERTY_NAME).orElse(null);
- TransportProtocol legacyProtocol;
+ if (propertyConfiguration.hasProperty(PORT)) {
+ final String protocolRawValue =
propertyConfiguration.getPropertyValue(PROTOCOL).orElse(null);
+ TransportProtocol protocol;
try {
- legacyProtocol = legacyProtocolRawValue != null ?
TransportProtocol.valueOf(legacyProtocolRawValue) : TransportProtocol.TCP;
+ protocol = protocolRawValue != null ?
TransportProtocol.valueOf(protocolRawValue) : TransportProtocol.TCP;
} catch (final Exception e) {
// should never happen, but if we get an unexpected value that
cannot be converted to an enum, default to TCP
- getLogger().warn("Unknown legacy protocol '{}' provided for
ListenSyslog. Defaulting to TCP", legacyProtocolRawValue);
- legacyProtocol = TransportProtocol.TCP;
+ getLogger().warn("Unknown legacy protocol '{}' provided for
ListenSyslog. Defaulting to TCP", protocolRawValue);
+ protocol = TransportProtocol.TCP;
}
- if (legacyProtocol == TransportProtocol.UDP) {
+ if (protocol == TransportProtocol.UDP) {
getLogger().info("Migrating ListenSyslog 'Port' property to
'UDP Port'");
-
propertyConfiguration.renameProperty(LEGACY_PORT_PROPERTY_NAME,
UDP_PORT.getName());
+ propertyConfiguration.renameProperty(PORT.getName(),
UDP_PORT.getName());
} else {
getLogger().info("Migrating ListenSyslog 'Port' property to
'TCP Port'");
-
propertyConfiguration.renameProperty(LEGACY_PORT_PROPERTY_NAME,
TCP_PORT.getName());
+ propertyConfiguration.renameProperty(PORT.getName(),
TCP_PORT.getName());
}
}
-
- if (propertyConfiguration.hasProperty(LEGACY_PROTOCOL_PROPERTY_NAME)) {
- getLogger().info("Removing deprecated ListenSyslog 'Protocol'
property");
-
propertyConfiguration.removeProperty(LEGACY_PROTOCOL_PROPERTY_NAME);
- }
}
@Override
@@ -295,9 +284,9 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
- // if we are changing the port (and possibly protocol), the events
that we may have queued up are no longer valid,
- // as they were received on a different port and may be from a
completely different source
- if (TCP_PORT.equals(descriptor) || UDP_PORT.equals(descriptor)) {
+ // if we are changing the protocol, the events that we may have queued
up are no longer valid, as they
+ // were received using a different protocol and may be from a
completely different source
+ if (PROTOCOL.equals(descriptor)) {
syslogEvents.clear();
}
}
@@ -310,27 +299,12 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
.explanation("Cannot set Parse Messages to 'true' if Batch
Size is greater than 1").build());
}
- if (!validationContext.getProperty(TCP_PORT).isSet() &&
!validationContext.getProperty(UDP_PORT).isSet()) {
- results.add(new
ValidationResult.Builder().subject(UDP_PORT.getName()).valid(false)
- .explanation("UDP Port must be set unless TCP Port is
set").build());
- results.add(new
ValidationResult.Builder().subject(TCP_PORT.getName()).valid(false)
- .explanation("TCP Port must be set unless UDP Port is
set").build());
- }
-
- if (validationContext.getProperty(TCP_PORT).isSet() &&
validationContext.getProperty(UDP_PORT).isSet()) {
- results.add(new
ValidationResult.Builder().subject(UDP_PORT.getName()).input(validationContext.getProperty(UDP_PORT).getValue()).valid(false)
- .explanation("Cannot set UDP Port when TCP Port is also
set").build());
- results.add(new
ValidationResult.Builder().subject(TCP_PORT.getName()).input(validationContext.getProperty(TCP_PORT).getValue()).valid(false)
- .explanation("Cannot set TCP Port when UDP Port is also
set").build());
- }
-
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
- final TransportProtocol transportProtocol =
getTransportProtocol(context);
- final int port = getConfiguredPortNumber(context);
+ final TransportProtocol transportProtocol =
context.getProperty(PROTOCOL).asAllowableValue(TransportProtocol.class);
final int receiveBufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxMessageQueueSize =
context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
@@ -341,16 +315,19 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
parser = new SyslogParser(charset);
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
+ final int port;
final int maxSocketBufferSize;
final int workerThreads;
final SSLContextProvider sslContextProvider;
final Boolean socketKeepAlive;
if (transportProtocol == TransportProtocol.TCP) {
+ port =
context.getProperty(TCP_PORT).evaluateAttributeExpressions().asInteger();
maxSocketBufferSize =
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
workerThreads =
context.getProperty(WORKER_THREADS).asLong().intValue();
sslContextProvider =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
socketKeepAlive =
context.getProperty(SOCKET_KEEP_ALIVE).asBoolean();
} else {
+ port =
context.getProperty(UDP_PORT).evaluateAttributeExpressions().asInteger();
maxSocketBufferSize = 1_000_000;
workerThreads = 2;
sslContextProvider = null;
@@ -380,50 +357,31 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
eventServer = factory.getEventServer();
}
- private TransportProtocol getTransportProtocol(final ProcessContext
context) {
- if (context.getProperty(TCP_PORT).isSet() &&
!context.getProperty(UDP_PORT).isSet()) {
- return TransportProtocol.TCP;
- } else if (context.getProperty(UDP_PORT).isSet() &&
!context.getProperty(TCP_PORT).isSet()) {
- return TransportProtocol.UDP;
- } else {
- throw new IllegalStateException("Processor is invalid. Exactly one
of TCP Port or UDP Port properties must be set.");
- }
- }
-
- private int getConfiguredPortNumber(final ProcessContext context) {
- if (context.getProperty(TCP_PORT).isSet() &&
!context.getProperty(UDP_PORT).isSet()) {
- return
context.getProperty(TCP_PORT).evaluateAttributeExpressions().asInteger();
- } else if (context.getProperty(UDP_PORT).isSet() &&
!context.getProperty(TCP_PORT).isSet()) {
- return
context.getProperty(UDP_PORT).evaluateAttributeExpressions().asInteger();
- } else {
- throw new IllegalStateException("Processor is invalid. Exactly one
of TCP Port or UDP Port properties must be set.");
- }
- }
-
@Override
public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
- final Integer tcpPortNumber =
context.getProperty(TCP_PORT).evaluateAttributeExpressions().asInteger();
- final Integer udpPortNumber =
context.getProperty(UDP_PORT).evaluateAttributeExpressions().asInteger();
-
final List<ListenPort> ports = new ArrayList<>();
- if (tcpPortNumber != null) {
+ final TransportProtocol transportProtocol =
context.getProperty(PROTOCOL).asAllowableValue(TransportProtocol.class);
+
+ if (transportProtocol == TransportProtocol.TCP) {
+ final Integer tcpPortNumber =
context.getProperty(TCP_PORT).evaluateAttributeExpressions().asInteger();
final ListenPort port = StandardListenPort.builder()
- .portNumber(tcpPortNumber)
- .portName(TCP_PORT.getDisplayName())
-
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.TCP)
- .applicationProtocols(List.of("syslog"))
- .build();
+ .portNumber(tcpPortNumber)
+ .portName(TCP_PORT.getDisplayName())
+
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.TCP)
+ .applicationProtocols(List.of("syslog"))
+ .build();
ports.add(port);
}
- if (udpPortNumber != null) {
+ if (transportProtocol == TransportProtocol.UDP) {
+ final Integer udpPortNumber =
context.getProperty(UDP_PORT).evaluateAttributeExpressions().asInteger();
final ListenPort port = StandardListenPort.builder()
- .portNumber(udpPortNumber)
- .portName(UDP_PORT.getDisplayName())
-
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.UDP)
- .applicationProtocols(List.of("syslog"))
- .build();
+ .portNumber(udpPortNumber)
+ .portName(UDP_PORT.getDisplayName())
+
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.UDP)
+ .applicationProtocols(List.of("syslog"))
+ .build();
ports.add(port);
}
@@ -570,7 +528,7 @@ public class ListenSyslog extends AbstractSyslogProcessor
implements ListenCompo
private Map<String, String> getDefaultAttributes(final ProcessContext
context) {
final String port = String.valueOf(getListeningPort());
- final String protocol = getTransportProtocol(context).toString();
+ final String protocol = context.getProperty(PROTOCOL).getValue();
final Map<String, String> defaultAttributes = new HashMap<>();
defaultAttributes.put(SyslogAttributes.SYSLOG_PROTOCOL.key(),
protocol);
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index dc4113b604..ea5ffe2633 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -76,22 +76,6 @@ public class PutSyslog extends AbstractSyslogProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
- public static final PropertyDescriptor PORT = new PropertyDescriptor
- .Builder().name("Port")
- .description("The destination port on the remote Syslog server to use
for sending Syslog messages.")
- .required(true)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
- .Builder().name("Protocol")
- .description("The protocol to use for sending Syslog messages to the
Syslog server.")
- .required(true)
- .allowableValues(TCP_VALUE, UDP_VALUE)
- .defaultValue(UDP_VALUE.getValue())
- .build();
-
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new
PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that
should be used. This is a suggestion to the Operating System " +
@@ -167,8 +151,8 @@ public class PutSyslog extends AbstractSyslogProcessor {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
HOSTNAME,
- PORT,
PROTOCOL,
+ PORT,
MAX_SOCKET_SEND_BUFFER_SIZE,
SSL_CONTEXT_SERVICE,
IDLE_EXPIRATION,
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 8295b9ff7b..bc88e50a84 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,7 +16,8 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
@@ -27,6 +28,8 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.syslog.attributes.SyslogAttributes;
+import org.apache.nifi.util.EmptyControllerServiceLookup;
+import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -37,11 +40,12 @@ import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.Collection;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestListenSyslog {
@@ -75,54 +79,10 @@ public class TestListenSyslog {
processor.shutdownEventServer();
}
- @Test
- public void testCustomValidate() throws Exception {
-
- // Neither TCP Port nor UDP Port is set by default, assert invalid by
default
- Collection<ValidationResult> validationResultCollection =
runner.validate();
-
- assertEquals(2, validationResultCollection.size());
- assertTrue(validationResultCollection.stream().anyMatch(result ->
- result.getSubject().equals(ListenSyslog.TCP_PORT.getName())
- && !result.isValid()
- && result.getExplanation().contains("TCP Port must be set unless
UDP Port is set")));
- assertTrue(validationResultCollection.stream().anyMatch(result ->
- result.getSubject().equals(ListenSyslog.UDP_PORT.getName())
- && !result.isValid()
- && result.getExplanation().contains("UDP Port must be set
unless TCP Port is set")));
-
- // Now set only TCP Port and assert valid
- runner.setProperty(ListenSyslog.TCP_PORT, "0");
- validationResultCollection = runner.validate();
-
- assertEquals(0, validationResultCollection.size());
-
- // Now set only UDP Port and assert valid
- runner.clearProperties();
- runner.setProperty(ListenSyslog.UDP_PORT, "1");
- validationResultCollection = runner.validate();
-
- assertEquals(0, validationResultCollection.size());
-
- // Now set both TCP and UDP Port and assert invalid
- runner.setProperty(ListenSyslog.TCP_PORT, "0");
- runner.setProperty(ListenSyslog.UDP_PORT, "1");
- validationResultCollection = runner.validate();
-
- assertEquals(2, validationResultCollection.size());
- assertTrue(validationResultCollection.stream().anyMatch(result ->
- result.getSubject().equals(ListenSyslog.TCP_PORT.getName())
- && !result.isValid()
- && result.getExplanation().contains("annot set TCP Port when
UDP Port is also set")));
- assertTrue(validationResultCollection.stream().anyMatch(result ->
- result.getSubject().equals(ListenSyslog.UDP_PORT.getName())
- && !result.isValid()
- && result.getExplanation().contains("Cannot set UDP Port when
TCP Port is also set")));
- }
-
@Test
public void testRunTcp() throws Exception {
final TransportProtocol protocol = TransportProtocol.TCP;
+ runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.TCP_PORT, "0");
runner.setProperty(ListenSyslog.SOCKET_KEEP_ALIVE,
Boolean.FALSE.toString());
@@ -132,6 +92,7 @@ public class TestListenSyslog {
@Test
public void testRunTcpBatchParseDisabled() throws Exception {
final TransportProtocol protocol = TransportProtocol.TCP;
+ runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.TCP_PORT, "0");
runner.setProperty(ListenSyslog.SOCKET_KEEP_ALIVE,
Boolean.FALSE.toString());
runner.setProperty(ListenSyslog.PARSE_MESSAGES,
Boolean.FALSE.toString());
@@ -158,6 +119,7 @@ public class TestListenSyslog {
@Test
public void testRunUdp() throws Exception {
final TransportProtocol protocol = TransportProtocol.UDP;
+ runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.UDP_PORT, "0");
assertSendSuccess(protocol);
@@ -166,6 +128,7 @@ public class TestListenSyslog {
@Test
public void testRunUdpBatch() throws Exception {
final TransportProtocol protocol = TransportProtocol.UDP;
+ runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.UDP_PORT, "0");
final String[] messages = new String[]{VALID_MESSAGE, VALID_MESSAGE};
@@ -191,6 +154,7 @@ public class TestListenSyslog {
@Test
public void testRunUdpInvalid() throws Exception {
final TransportProtocol protocol = TransportProtocol.UDP;
+ runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
runner.setProperty(ListenSyslog.UDP_PORT, "0");
runner.run(1, STOP_ON_FINISH_DISABLED);
@@ -262,4 +226,42 @@ public class TestListenSyslog {
}
}
}
+
+ @Test
+ public void testGetListenPortsTcp() {
+ tesGetListenPorts(TransportProtocol.TCP, 1);
+ }
+
+ @Test
+ public void testGetListenPortsUdp() {
+ tesGetListenPorts(TransportProtocol.UDP, 2);
+ }
+
+ private void tesGetListenPorts(final TransportProtocol protocol, final int
portNumber) {
+ runner.setProperty(ListenSyslog.TCP_PORT, "0");
+ runner.setProperty(ListenSyslog.UDP_PORT, "0");
+
+ runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
+ if (protocol == TransportProtocol.TCP) {
+ runner.setProperty(ListenSyslog.TCP_PORT,
String.valueOf(portNumber));
+ } else {
+ runner.setProperty(ListenSyslog.UDP_PORT,
String.valueOf(portNumber));
+ }
+
+ final ConfigurationContext configurationContext = new
MockConfigurationContext(runner.getProcessContext().getProperties(), new
EmptyControllerServiceLookup(), Map.of());
+ final List<ListenPort> listenPorts =
processor.getListenPorts(configurationContext);
+
+ assertNotNull(listenPorts);
+ assertEquals(1, listenPorts.size());
+ final ListenPort listenPort = listenPorts.get(0);
+ assertEquals(portNumber, listenPort.getPortNumber());
+ assertEquals(convertProtocol(protocol),
listenPort.getTransportProtocol());
+ assertEquals(List.of("syslog"), listenPort.getApplicationProtocols());
+ }
+
+ private org.apache.nifi.components.listen.TransportProtocol
convertProtocol(final TransportProtocol protocol) {
+ return protocol == TransportProtocol.TCP
+ ? org.apache.nifi.components.listen.TransportProtocol.TCP
+ : org.apache.nifi.components.listen.TransportProtocol.UDP;
+ }
}