This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7e3ffc85d5 NIFI-15204 Updated Listen Components to support discovery
of Listen Ports (#10530)
7e3ffc85d5 is described below
commit 7e3ffc85d5bd800350b3062cf6a28967330cb6ba
Author: Kevin Doran <[email protected]>
AuthorDate: Mon Nov 17 09:30:21 2025 -0500
NIFI-15204 Updated Listen Components to support discovery of Listen Ports
(#10530)
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../util/listen/AbstractListenEventProcessor.java | 25 +++++++++++++-
.../processor/util/listen/ListenerProperties.java | 7 ----
.../nifi/processors/opentelemetry/ListenOTLP.java | 36 +++++++++++++++++---
.../nifi/snmp/processors/ListenTrapSNMP.java | 26 ++++++++++++++-
.../processors/standard/HandleHttpRequest.java | 38 ++++++++++++++++++++-
.../apache/nifi/processors/standard/ListenFTP.java | 26 ++++++++++++++-
.../apache/nifi/processors/standard/ListenTCP.java | 39 +++++++++++++++++++---
.../nifi/processors/standard/TestListenTCP.java | 4 +--
.../cache/server/AbstractCacheServer.java | 35 ++++++++++++++++---
.../nifi/websocket/jetty/JettyWebSocketServer.java | 26 +++++++++++++--
10 files changed, 233 insertions(+), 29 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 5084ddf79c..e66f4015d5 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -19,6 +19,11 @@ package org.apache.nifi.processor.util.listen;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -56,13 +61,14 @@ import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_I
*
* @param <E> the type of events being produced
*/
-public abstract class AbstractListenEventProcessor<E extends Event> extends
AbstractProcessor {
+public abstract class AbstractListenEventProcessor<E extends Event> extends
AbstractProcessor implements ListenComponent {
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port to listen on for communication.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .identifiesListenPort(TransportProtocol.UDP)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
@@ -194,6 +200,23 @@ public abstract class AbstractListenEventProcessor<E
extends Event> extends Abst
readerThread.start();
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.UDP)
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
public int getListeningPort() {
return port;
}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
index d5a31e1e12..f7d536ff85 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
@@ -84,13 +84,6 @@ public class ListenerProperties {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
- public static final PropertyDescriptor PORT = new PropertyDescriptor
- .Builder().name("Port")
- .description("The port to listen on for communication.")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .build();
public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the received data.")
diff --git
a/nifi-extension-bundles/nifi-opentelemetry-bundle/nifi-opentelemetry-processors/src/main/java/org/apache/nifi/processors/opentelemetry/ListenOTLP.java
b/nifi-extension-bundles/nifi-opentelemetry-bundle/nifi-opentelemetry-processors/src/main/java/org/apache/nifi/processors/opentelemetry/ListenOTLP.java
index 6b373e7b3e..84990793dc 100644
---
a/nifi-extension-bundles/nifi-opentelemetry-bundle/nifi-opentelemetry-processors/src/main/java/org/apache/nifi/processors/opentelemetry/ListenOTLP.java
+++
b/nifi-extension-bundles/nifi-opentelemetry-bundle/nifi-opentelemetry-processors/src/main/java/org/apache/nifi/processors/opentelemetry/ListenOTLP.java
@@ -26,21 +26,26 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import
org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
-import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
-import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider;
-import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
+import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider;
+import
org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
+import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextProvider;
@@ -69,7 +74,9 @@ import java.util.concurrent.LinkedBlockingQueue;
@WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_TYPE,
description = "OpenTelemetry Resource Type: LOGS, METRICS, or TRACES"),
@WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_COUNT,
description = "Count of resource elements included in messages"),
})
-public class ListenOTLP extends AbstractProcessor {
+public class ListenOTLP extends AbstractProcessor implements ListenComponent {
+
+ static final String[] OTLP_APPLICATION_PROTOCOLS = {"http/1.1", "h2",
"grpc", "otlp"};
static final PropertyDescriptor ADDRESS = new PropertyDescriptor.Builder()
.name("Address")
@@ -85,6 +92,7 @@ public class ListenOTLP extends AbstractProcessor {
.description("TCP port number on which to listen for OTLP Export
Service Requests over HTTP and gRPC")
.required(true)
.defaultValue("4317")
+ .identifiesListenPort(TransportProtocol.TCP,
OTLP_APPLICATION_PROTOCOLS)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@@ -172,6 +180,24 @@ public class ListenOTLP extends AbstractProcessor {
server = eventServerFactory.getEventServer();
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber = context.getProperty(PORT).asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.TCP)
+ .applicationProtocols(List.of(OTLP_APPLICATION_PROTOCOLS))
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
@OnStopped
public void onStopped() {
if (server != null) {
diff --git
a/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/ListenTrapSNMP.java
b/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/ListenTrapSNMP.java
index d7b325f635..476e6d2400 100644
---
a/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/ListenTrapSNMP.java
+++
b/nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/ListenTrapSNMP.java
@@ -25,8 +25,13 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
@@ -70,13 +75,14 @@ import static
org.apache.nifi.snmp.processors.properties.BasicProperties.SNMP_V3
@WritesAttribute(attribute = SNMPUtils.SNMP_PROP_PREFIX + "*", description =
"Attributes retrieved from the SNMP response. It may include:"
+ " snmp$errorIndex, snmp$errorStatus, snmp$errorStatusText,
snmp$nonRepeaters, snmp$requestID, snmp$type, snmp$variableBindings")
@RequiresInstanceClassLoading
-public class ListenTrapSNMP extends AbstractSessionFactoryProcessor implements
VerifiableProcessor {
+public class ListenTrapSNMP extends AbstractSessionFactoryProcessor implements
VerifiableProcessor, ListenComponent {
public static final PropertyDescriptor SNMP_MANAGER_PORT = new
PropertyDescriptor.Builder()
.name("SNMP Manager Port")
.description("The port where the SNMP Manager listens to the
incoming traps.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
+ .identifiesListenPort(TransportProtocol.UDP, "snmptrap")
.build();
public static final PropertyDescriptor SNMP_USM_USER_INPUT_METHOD = new
PropertyDescriptor.Builder()
@@ -199,6 +205,24 @@ public class ListenTrapSNMP extends
AbstractSessionFactoryProcessor implements V
return snmpTrapReceiverHandler.getListeningPort();
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber =
context.getProperty(SNMP_MANAGER_PORT).asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(SNMP_MANAGER_PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.UDP)
+ .applicationProtocols(List.of("snmptrap"))
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
@Override
public void onTrigger(final ProcessContext context, final
ProcessSessionFactory processSessionFactory) {
if (!snmpTrapReceiverHandler.isStarted()) {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 46d4d81a26..e033edd666 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -40,6 +40,11 @@ import
org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.http.HttpContextMap;
@@ -150,7 +155,7 @@ import static
jakarta.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
@WritesAttribute(attribute = "http.multipart.fragments.total.number",
description = "For requests with Content-Type \"multipart/form-data\",
the count of all parts is recorded into this attribute.")})
@SeeAlso(value = {HandleHttpResponse.class})
-public class HandleHttpRequest extends AbstractProcessor {
+public class HandleHttpRequest extends AbstractProcessor implements
ListenComponent {
private static final String MIME_TYPE__MULTIPART_FORM_DATA =
"multipart/form-data";
@@ -171,6 +176,7 @@ public class HandleHttpRequest extends AbstractProcessor {
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2")
.defaultValue("80")
.build();
public static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder()
@@ -510,6 +516,36 @@ public class HandleHttpRequest extends AbstractProcessor {
ready = true;
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+
+ final List<ListenPort> ports = new ArrayList<>();
+
+ final Integer portNumber =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ final SSLContextProvider sslContextProvider =
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextProvider.class);
+ final HttpProtocolStrategy httpProtocolStrategy = sslContextProvider
== null
+ ?
HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
+ :
context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
+ final List<String> applicationProtocols = switch
(httpProtocolStrategy) {
+ case H2 -> List.of("h2");
+ case HTTP_1_1 -> List.of("http/1.1");
+ case H2_HTTP_1_1 -> List.of("h2", "http/1.1");
+ case null -> List.of("h2", "http/1.1");
+ };
+
+ if (portNumber != null) {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.TCP)
+ .applicationProtocols(applicationProtocols)
+ .build();
+ ports.add(port);
+ }
+
+ return ports;
+ }
+
protected int getPort() {
for (final Connector connector : server.getConnectors()) {
if (connector instanceof ServerConnector) {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
index dbac07b26b..7ce37be1b0 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
@@ -27,6 +27,11 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@@ -58,7 +63,7 @@ import java.util.concurrent.atomic.AtomicReference;
+ "E.g.: file.txt is uploaded to /Folder1/SubFolder, then the
value of the path attribute will be \"/Folder1/SubFolder/\" "
+ "(note that it ends with a separator character).")
})
-public class ListenFTP extends AbstractSessionFactoryProcessor {
+public class ListenFTP extends AbstractSessionFactoryProcessor implements
ListenComponent {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
@@ -90,6 +95,7 @@ public class ListenFTP extends
AbstractSessionFactoryProcessor {
.required(true)
.defaultValue("2221")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .identifiesListenPort(TransportProtocol.TCP, "ftp")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
@@ -187,6 +193,24 @@ public class ListenFTP extends
AbstractSessionFactoryProcessor {
}
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.TCP)
+ .applicationProtocols(List.of("ftp"))
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
@OnStopped
public void stopFtpServer() {
if (ftpServer != null && !ftpServer.isStopped()) {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index 19ad7d3703..8e84bc7bcd 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -25,6 +25,10 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.SslSessionStatus;
@@ -33,6 +37,7 @@ import
org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
@@ -87,10 +92,19 @@ import java.util.concurrent.atomic.AtomicLong;
@WritesAttribute(attribute = "client.certificate.subject.dn",
description = "For connections using mutual TLS, the Distinguished Name of the
" +
"client certificate's owner (subject) is attached to the
FlowFile.")
})
-public class ListenTCP extends AbstractProcessor {
+public class ListenTCP extends AbstractProcessor implements ListenComponent {
private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE =
"client.certificate.subject.dn";
private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE =
"client.certificate.issuer.dn";
+ public static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The port to listen on for TCP connections.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+
.identifiesListenPort(org.apache.nifi.components.listen.TransportProtocol.TCP)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an
SSL Context. If this property is set, " +
@@ -127,7 +141,6 @@ public class ListenTCP extends AbstractProcessor {
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
ListenerProperties.NETWORK_INTF_NAME,
- ListenerProperties.PORT,
ListenerProperties.RECV_BUFFER_SIZE,
ListenerProperties.MAX_MESSAGE_QUEUE_SIZE,
ListenerProperties.MAX_SOCKET_BUFFER_SIZE,
@@ -135,6 +148,7 @@ public class ListenTCP extends AbstractProcessor {
ListenerProperties.WORKER_THREADS,
ListenerProperties.MAX_BATCH_SIZE,
ListenerProperties.MESSAGE_DELIMITER,
+ PORT,
IDLE_CONNECTION_TIMEOUT,
POOL_RECV_BUFFERS,
SSL_CONTEXT_SERVICE,
@@ -179,7 +193,7 @@ public class ListenTCP extends AbstractProcessor {
final String networkInterface =
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress address =
NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset =
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
- port =
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+ port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
eventsCapacity =
context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger();
events = new TrackingLinkedBlockingQueue<>(eventsCapacity);
errorEvents = new LinkedBlockingQueue<>();
@@ -211,6 +225,23 @@ public class ListenTCP extends AbstractProcessor {
}
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.TCP)
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
public int getListeningPort() {
return eventServer == null ? 0 : eventServer.getListeningPort();
}
@@ -320,4 +351,4 @@ public class ListenTCP extends AbstractProcessor {
nextTrackingLog.getAndSet(nextTrackingLogScheduled);
}
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index 711a709f50..79b2f63ec1 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -90,7 +90,7 @@ public class TestListenTCP {
@Test
public void testCustomValidate() throws Exception {
- runner.setProperty(ListenerProperties.PORT, "1");
+ runner.setProperty(ListenTCP.PORT, "1");
runner.assertValid();
enableSslContextService(keyStoreSslContext);
@@ -189,7 +189,7 @@ public class TestListenTCP {
}
private void run(final List<String> messages, final int flowFiles, final
SSLContext sslContext) throws Exception {
- runner.setProperty(ListenerProperties.PORT, "0");
+ runner.setProperty(ListenTCP.PORT, "0");
final String message = StringUtils.join(messages, null);
final byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
runner.run(1, false, true);
diff --git
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index 45b09be390..bd47f2965d 100644
---
a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++
b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -16,31 +16,38 @@
*/
package org.apache.nifi.distributed.cache.server;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextProvider;
-public abstract class AbstractCacheServer extends AbstractControllerService {
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractCacheServer extends AbstractControllerService
implements ListenComponent {
public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
+ public static final String APPLICATION_PROTOCOL_NAME =
"nifi.apache.org/cache";
+
public static final PropertyDescriptor PORT = new
PropertyDescriptor.Builder()
.name("Port")
.description("The port to listen on for incoming connections")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
+ .identifiesListenPort(TransportProtocol.TCP, APPLICATION_PROTOCOL_NAME)
.defaultValue("4557")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
@@ -100,6 +107,24 @@ public abstract class AbstractCacheServer extends
AbstractControllerService {
}
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber = context.getProperty(PORT).asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.TCP)
+ .applicationProtocols(List.of(APPLICATION_PROTOCOL_NAME))
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
@OnShutdown
@OnDisabled
public void shutdownServer() throws IOException {
diff --git
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
index 37deb027e2..2b33938dbe 100644
---
a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
+++
b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
@@ -25,6 +25,10 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.listen.ListenComponent;
+import org.apache.nifi.components.listen.ListenPort;
+import org.apache.nifi.components.listen.StandardListenPort;
+import org.apache.nifi.components.listen.TransportProtocol;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.ConfigurationContext;
@@ -60,7 +64,6 @@ import org.eclipse.jetty.util.resource.PathResourceFactory;
import org.eclipse.jetty.util.resource.Resource;
import javax.net.ssl.SSLContext;
-
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.Path;
@@ -79,7 +82,7 @@ import java.util.stream.Stream;
@CapabilityDescription("Implementation of WebSocketServerService." +
" This service uses Jetty WebSocket server module to provide" +
" WebSocket session management throughout the application.")
-public class JettyWebSocketServer extends AbstractJettyWebSocketService
implements WebSocketServerService {
+public class JettyWebSocketServer extends AbstractJettyWebSocketService
implements WebSocketServerService, ListenComponent {
/**
* A global map to refer a controller service instance by requested port
number.
@@ -113,6 +116,7 @@ public class JettyWebSocketServer extends
AbstractJettyWebSocketService implemen
.description("The port number on which this WebSocketServer
listens to.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .identifiesListenPort(TransportProtocol.TCP, "ws")
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
@@ -343,6 +347,24 @@ public class JettyWebSocketServer extends
AbstractJettyWebSocketService implemen
portToControllerService.put(listenPort, this);
}
+ @Override
+ public List<ListenPort> getListenPorts(final ConfigurationContext context)
{
+ final Integer portNumber =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ final List<ListenPort> ports;
+ if (portNumber == null) {
+ ports = List.of();
+ } else {
+ final ListenPort port = StandardListenPort.builder()
+ .portNumber(portNumber)
+ .portName(PORT.getDisplayName())
+ .transportProtocol(TransportProtocol.TCP)
+ .applicationProtocols(List.of("ws"))
+ .build();
+ ports = List.of(port);
+ }
+ return ports;
+ }
+
public int getListeningPort() {
return listenPort;
}