This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8c876da6fe NIFI-15212 Update ListenSyslog to support Listen Ports
8c876da6fe is described below
commit 8c876da6fed05bc6f4d68c79d8c1e1b768221f82
Author: Kevin Doran <[email protected]>
AuthorDate: Wed Nov 12 13:51:10 2025 -0500
NIFI-15212 Update ListenSyslog to support Listen Ports
- Refactor Port and Protocol properties to mutally exclusive TCP Port and
UDP Port properties
- Refactor shared members between ListenSyslog and PutSyslog to be distinct
where necessary
- Add ListenPort and ListenComponent API support to ListenSyslog
- Add custom migrate and custom validation logic for new properties
Signed-off-by: Pierre Villard <[email protected]>
This closes #10531.
---
.../standard/AbstractSyslogProcessor.java | 14 --
.../nifi/processors/standard/ListenSyslog.java | 146 +++++++++++++++++++--
.../apache/nifi/processors/standard/PutSyslog.java | 18 ++-
.../nifi/processors/standard/TestListenSyslog.java | 62 +++++++--
4 files changed, 201 insertions(+), 39 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index 9768fbc0aa..fd4c6f5bfa 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -30,20 +30,6 @@ public abstract class AbstractSyslogProcessor extends
AbstractProcessor {
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP",
"TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP",
"UDP");
- public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
- .Builder().name("Protocol")
- .description("The protocol for Syslog communication.")
- .required(true)
- .allowableValues(TCP_VALUE, UDP_VALUE)
- .defaultValue(UDP_VALUE.getValue())
- .build();
- public static final PropertyDescriptor PORT = new PropertyDescriptor
- .Builder().name("Port")
- .description("The port for Syslog communication. Note that
Expression language is not evaluated per FlowFile.")
- .required(true)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the Syslog messages.
Note that Expression language is not evaluated per FlowFile.")
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 4cb7d8cbf8..700c3eee9b 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -30,12 +30,17 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.event.transport.netty.FilteringStrategy;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
@@ -94,7 +99,29 @@ import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_I
@WritesAttribute(attribute = "syslog.port", description =
"The port over which the Syslog message was received."),
@WritesAttribute(attribute = "mime.type", description =
"The mime.type of the FlowFile which will be text/plain for Syslog messages.")})
@SeeAlso({PutSyslog.class, ParseSyslog.class})
-public class ListenSyslog extends AbstractSyslogProcessor {
+public class ListenSyslog extends AbstractSyslogProcessor implements
ListenComponent {
+
+ // See migrateProperties() below for how legacy properties are handled on
upgrade
+ private static final String LEGACY_PORT_PROPERTY_NAME = "Port";
+ private static final String LEGACY_PROTOCOL_PROPERTY_NAME = "Protocol";
+
+ public static final PropertyDescriptor TCP_PORT = new PropertyDescriptor
+ .Builder().name("TCP Port")
+ .description("The port to listen on for TCP Syslog communication.
Either this or UDP Port must be set, but both cannot be set at the same time.")
+ .required(false)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+
.identifiesListenPort(org.apache.nifi.components.listen.TransportProtocol.TCP,
"syslog")
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor UDP_PORT = new PropertyDescriptor
+ .Builder().name("UDP Port")
+ .description("The port to listen on for UDP Syslog communication.
Either this or TCP Port must be set, but both cannot be set at the same time.")
+ .required(false)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+
.identifiesListenPort(org.apache.nifi.components.listen.TransportProtocol.UDP,
"syslog")
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new
PropertyDescriptor.Builder()
.name("Max Size of Message Queue")
@@ -123,7 +150,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.required(true)
- .dependsOn(PROTOCOL, TCP_VALUE)
+ .dependsOn(TCP_PORT)
.build();
public static final PropertyDescriptor WORKER_THREADS = new
PropertyDescriptor.Builder()
.name("Worker Threads")
@@ -131,7 +158,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2")
.required(true)
- .dependsOn(PROTOCOL, TCP_VALUE)
+ .dependsOn(TCP_PORT)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
.name("Max Batch Size")
@@ -163,7 +190,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
"messages will be received over a secure connection.")
.required(false)
.identifiesControllerService(SSLContextProvider.class)
- .dependsOn(PROTOCOL, TCP_VALUE)
+ .dependsOn(TCP_PORT)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("Client Auth")
@@ -180,12 +207,12 @@ public class ListenSyslog extends AbstractSyslogProcessor
{
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.defaultValue(Boolean.FALSE.toString())
- .dependsOn(PROTOCOL, TCP_VALUE)
+ .dependsOn(TCP_PORT)
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
- PROTOCOL,
- PORT,
+ TCP_PORT,
+ UDP_PORT,
NETWORK_INTF_NAME,
SOCKET_KEEP_ALIVE,
SSL_CONTEXT_SERVICE,
@@ -227,6 +254,33 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public void migrateProperties(final PropertyConfiguration
propertyConfiguration) {
propertyConfiguration.renameProperty("Max Number of TCP Connections",
WORKER_THREADS.getName());
propertyConfiguration.renameProperty("socket-keep-alive",
SOCKET_KEEP_ALIVE.getName());
+
+ // Older version of ListenSyslog had a single "Port" property and a
"Protocol" property to specify TCP or UDP.
+ // Starting with 2.7.0, ListenSyslog now uses two mutually exclusive
"TCP Port" and "UDP Port" properties in order to support the
ListenPortDefinition feature of Property Descriptors.
+ if (propertyConfiguration.isPropertySet(LEGACY_PORT_PROPERTY_NAME)) {
+ final String legacyProtocolRawValue =
propertyConfiguration.getPropertyValue(LEGACY_PROTOCOL_PROPERTY_NAME).orElse(null);
+ TransportProtocol legacyProtocol;
+ try {
+ legacyProtocol = legacyProtocolRawValue != null ?
TransportProtocol.valueOf(legacyProtocolRawValue) : TransportProtocol.TCP;
+ } catch (final Exception e) {
+ // should never happen, but if we get an unexpected value that
cannot be converted to an enum, default to TCP
+ getLogger().warn("Unknown legacy protocol '{}' provided for
ListenSyslog. Defaulting to TCP", legacyProtocolRawValue);
+ legacyProtocol = TransportProtocol.TCP;
+ }
+
+ if (legacyProtocol == TransportProtocol.UDP) {
+ getLogger().info("Migrating ListenSyslog 'Port' property to
'UDP Port'");
+
propertyConfiguration.renameProperty(LEGACY_PORT_PROPERTY_NAME,
UDP_PORT.getName());
+ } else {
+ getLogger().info("Migrating ListenSyslog 'Port' property to
'TCP Port'");
+
propertyConfiguration.renameProperty(LEGACY_PORT_PROPERTY_NAME,
TCP_PORT.getName());
+ }
+ }
+
+ if (propertyConfiguration.hasProperty(LEGACY_PROTOCOL_PROPERTY_NAME)) {
+ getLogger().info("Removing deprecated ListenSyslog 'Protocol'
property");
+
propertyConfiguration.removeProperty(LEGACY_PROTOCOL_PROPERTY_NAME);
+ }
}
@Override
@@ -241,9 +295,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
- // if we are changing the protocol, the events that we may have queued
up are no longer valid, as they
- // were received using a different protocol and may be from a
completely different source
- if (PROTOCOL.equals(descriptor)) {
+ // if we are changing the port (and possibly protocol), the events
that we may have queued up are no longer valid,
+ // as they were received on a different port and may be from a
completely different source
+ if (TCP_PORT.equals(descriptor) || UDP_PORT.equals(descriptor)) {
syslogEvents.clear();
}
}
@@ -256,14 +310,28 @@ public class ListenSyslog extends AbstractSyslogProcessor
{
.explanation("Cannot set Parse Messages to 'true' if Batch
Size is greater than 1").build());
}
+ if (!validationContext.getProperty(TCP_PORT).isSet() &&
!validationContext.getProperty(UDP_PORT).isSet()) {
+ results.add(new
ValidationResult.Builder().subject(UDP_PORT.getName()).valid(false)
+ .explanation("UDP Port must be set unless TCP Port is
set").build());
+ results.add(new
ValidationResult.Builder().subject(TCP_PORT.getName()).valid(false)
+ .explanation("TCP Port must be set unless UDP Port is
set").build());
+ }
+
+ if (validationContext.getProperty(TCP_PORT).isSet() &&
validationContext.getProperty(UDP_PORT).isSet()) {
+ results.add(new
ValidationResult.Builder().subject(UDP_PORT.getName()).input(validationContext.getProperty(UDP_PORT).getValue()).valid(false)
+ .explanation("Cannot set UDP Port when TCP Port is also
set").build());
+ results.add(new
ValidationResult.Builder().subject(TCP_PORT.getName()).input(validationContext.getProperty(TCP_PORT).getValue()).valid(false)
+ .explanation("Cannot set TCP Port when UDP Port is also
set").build());
+ }
+
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
- final TransportProtocol transportProtocol =
TransportProtocol.valueOf(context.getProperty(PROTOCOL).getValue());
+ final TransportProtocol transportProtocol =
getTransportProtocol(context);
+ final int port = getConfiguredPortNumber(context);
- final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final int receiveBufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxMessageQueueSize =
context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
final String networkInterfaceName =
context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
@@ -312,7 +380,57 @@ public class ListenSyslog extends AbstractSyslogProcessor {
eventServer = factory.getEventServer();
}
- public int getListeningPort() {
+ private TransportProtocol getTransportProtocol(final ProcessContext
context) {
+ if (context.getProperty(TCP_PORT).isSet() &&
!context.getProperty(UDP_PORT).isSet()) {
+ return TransportProtocol.TCP;
+ } else if (context.getProperty(UDP_PORT).isSet() &&
!context.getProperty(TCP_PORT).isSet()) {
+ return TransportProtocol.UDP;
+ } else {
+ throw new IllegalStateException("Processor is invalid. Exactly one
of TCP Port or UDP Port properties must be set.");
+ }
+ }
+
+ private int getConfiguredPortNumber(final ProcessContext context) {
+ if (context.getProperty(TCP_PORT).isSet() &&
!context.getProperty(UDP_PORT).isSet()) {
+ return
context.getProperty(TCP_PORT).evaluateAttributeExpressions().asInteger();
+ } else if (context.getProperty(UDP_PORT).isSet() &&
!context.getProperty(TCP_PORT).isSet()) {
+ return
context.getProperty(UDP_PORT).evaluateAttributeExpressions().asInteger();
+ } else {
+ throw new IllegalStateException("Processor is invalid. Exactly one
of TCP Port or UDP Port properties must be set.");
+ }
+ }
+
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer tcpPortNumber =
context.getProperty(TCP_PORT).evaluateAttributeExpressions().asInteger();
+ final Integer udpPortNumber =
context.getProperty(UDP_PORT).evaluateAttributeExpressions().asInteger();
+
+ final List<ListenPort> ports = new ArrayList<>();
+
+ if (tcpPortNumber != null) {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(tcpPortNumber)
+ .portName(TCP_PORT.getDisplayName())
+
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.TCP)
+ .applicationProtocols(List.of("syslog"))
+ .build();
+ ports.add(port);
+ }
+
+ if (udpPortNumber != null) {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(udpPortNumber)
+ .portName(UDP_PORT.getDisplayName())
+
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.UDP)
+ .applicationProtocols(List.of("syslog"))
+ .build();
+ ports.add(port);
+ }
+
+ return ports;
+ }
+
+ int getListeningPort() {
return eventServer == null ? 0 : eventServer.getListeningPort();
}
@@ -452,7 +570,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
private Map<String, String> getDefaultAttributes(final ProcessContext
context) {
final String port = String.valueOf(getListeningPort());
- final String protocol = context.getProperty(PROTOCOL).getValue();
+ final String protocol = getTransportProtocol(context).toString();
final Map<String, String> defaultAttributes = new HashMap<>();
defaultAttributes.put(SyslogAttributes.SYSLOG_PROTOCOL.key(),
protocol);
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index ea5ffe2633..dc4113b604 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -76,6 +76,22 @@ public class PutSyslog extends AbstractSyslogProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The destination port on the remote Syslog server to use
for sending Syslog messages.")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
+ .Builder().name("Protocol")
+ .description("The protocol to use for sending Syslog messages to the
Syslog server.")
+ .required(true)
+ .allowableValues(TCP_VALUE, UDP_VALUE)
+ .defaultValue(UDP_VALUE.getValue())
+ .build();
+
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new
PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that
should be used. This is a suggestion to the Operating System " +
@@ -151,8 +167,8 @@ public class PutSyslog extends AbstractSyslogProcessor {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
HOSTNAME,
- PROTOCOL,
PORT,
+ PROTOCOL,
MAX_SOCKET_SEND_BUFFER_SIZE,
SSL_CONTEXT_SERVICE,
IDLE_EXPIRATION,
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index d12803a3bb..8295b9ff7b 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.LineEnding;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
@@ -36,6 +37,7 @@ import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.Collection;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,11 +75,55 @@ public class TestListenSyslog {
processor.shutdownEventServer();
}
+ @Test
+ public void testCustomValidate() throws Exception {
+
+ // Neither TCP Port nor UDP Port is set by default, assert invalid by
default
+ Collection<ValidationResult> validationResultCollection =
runner.validate();
+
+ assertEquals(2, validationResultCollection.size());
+ assertTrue(validationResultCollection.stream().anyMatch(result ->
+ result.getSubject().equals(ListenSyslog.TCP_PORT.getName())
+ && !result.isValid()
+ && result.getExplanation().contains("TCP Port must be set unless
UDP Port is set")));
+ assertTrue(validationResultCollection.stream().anyMatch(result ->
+ result.getSubject().equals(ListenSyslog.UDP_PORT.getName())
+ && !result.isValid()
+ && result.getExplanation().contains("UDP Port must be set
unless TCP Port is set")));
+
+ // Now set only TCP Port and assert valid
+ runner.setProperty(ListenSyslog.TCP_PORT, "0");
+ validationResultCollection = runner.validate();
+
+ assertEquals(0, validationResultCollection.size());
+
+ // Now set only UDP Port and assert valid
+ runner.clearProperties();
+ runner.setProperty(ListenSyslog.UDP_PORT, "1");
+ validationResultCollection = runner.validate();
+
+ assertEquals(0, validationResultCollection.size());
+
+ // Now set both TCP and UDP Port and assert invalid
+ runner.setProperty(ListenSyslog.TCP_PORT, "0");
+ runner.setProperty(ListenSyslog.UDP_PORT, "1");
+ validationResultCollection = runner.validate();
+
+ assertEquals(2, validationResultCollection.size());
+ assertTrue(validationResultCollection.stream().anyMatch(result ->
+ result.getSubject().equals(ListenSyslog.TCP_PORT.getName())
+ && !result.isValid()
+ && result.getExplanation().contains("annot set TCP Port when
UDP Port is also set")));
+ assertTrue(validationResultCollection.stream().anyMatch(result ->
+ result.getSubject().equals(ListenSyslog.UDP_PORT.getName())
+ && !result.isValid()
+ && result.getExplanation().contains("Cannot set UDP Port when
TCP Port is also set")));
+ }
+
@Test
public void testRunTcp() throws Exception {
final TransportProtocol protocol = TransportProtocol.TCP;
- runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
- runner.setProperty(ListenSyslog.PORT, "0");
+ runner.setProperty(ListenSyslog.TCP_PORT, "0");
runner.setProperty(ListenSyslog.SOCKET_KEEP_ALIVE,
Boolean.FALSE.toString());
assertSendSuccess(protocol);
@@ -86,8 +132,7 @@ public class TestListenSyslog {
@Test
public void testRunTcpBatchParseDisabled() throws Exception {
final TransportProtocol protocol = TransportProtocol.TCP;
- runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
- runner.setProperty(ListenSyslog.PORT, "0");
+ runner.setProperty(ListenSyslog.TCP_PORT, "0");
runner.setProperty(ListenSyslog.SOCKET_KEEP_ALIVE,
Boolean.FALSE.toString());
runner.setProperty(ListenSyslog.PARSE_MESSAGES,
Boolean.FALSE.toString());
runner.setProperty(ListenSyslog.MAX_BATCH_SIZE, "2");
@@ -113,8 +158,7 @@ public class TestListenSyslog {
@Test
public void testRunUdp() throws Exception {
final TransportProtocol protocol = TransportProtocol.UDP;
- runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
- runner.setProperty(ListenSyslog.PORT, "0");
+ runner.setProperty(ListenSyslog.UDP_PORT, "0");
assertSendSuccess(protocol);
}
@@ -122,8 +166,7 @@ public class TestListenSyslog {
@Test
public void testRunUdpBatch() throws Exception {
final TransportProtocol protocol = TransportProtocol.UDP;
- runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
- runner.setProperty(ListenSyslog.PORT, "0");
+ runner.setProperty(ListenSyslog.UDP_PORT, "0");
final String[] messages = new String[]{VALID_MESSAGE, VALID_MESSAGE};
@@ -148,8 +191,7 @@ public class TestListenSyslog {
@Test
public void testRunUdpInvalid() throws Exception {
final TransportProtocol protocol = TransportProtocol.UDP;
- runner.setProperty(ListenSyslog.PROTOCOL, protocol.toString());
- runner.setProperty(ListenSyslog.PORT, "0");
+ runner.setProperty(ListenSyslog.UDP_PORT, "0");
runner.run(1, STOP_ON_FINISH_DISABLED);
final int listeningPort = ((ListenSyslog)
runner.getProcessor()).getListeningPort();