This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0024b1ddbd NIFI-15441 Added udp.sender.port FlowFile Attribute to
ListenUDP (#10743)
0024b1ddbd is described below
commit 0024b1ddbde1dd795b33477036f83aca8d8738a6
Author: greyp9 <[email protected]>
AuthorDate: Thu Jan 15 12:32:03 2026 -0500
NIFI-15441 Added udp.sender.port FlowFile Attribute to ListenUDP (#10743)
Signed-off-by: David Handermann <[email protected]>
---
.../util/listen/dispatcher/DatagramChannelDispatcher.java | 8 +++++---
.../apache/nifi/processor/util/listen/event/EventFactory.java | 1 +
.../nifi/processor/util/listen/event/EventFactoryUtil.java | 3 ++-
.../nifi/processor/util/listen/event/StandardEvent.java | 10 ++++++++++
.../processor/util/listen/event/StandardEventFactory.java | 4 +++-
.../apache/nifi/processor/util/listen/EventBatcherTest.java | 4 ++--
.../java/org/apache/nifi/processors/standard/ListenUDP.java | 11 ++++++++---
.../org/apache/nifi/processors/standard/TestListenUDP.java | 11 +++++++----
8 files changed, 38 insertions(+), 14 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
index 7e565aaabf..7b7306b3c4 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
@@ -124,8 +124,10 @@ public class DatagramChannelDispatcher<E extends
Event<DatagramChannel>> impleme
buffer.clear();
while (!stopped && (socketAddress =
channel.receive(buffer)) != null) {
String sender = "";
- if (socketAddress instanceof InetSocketAddress) {
- sender = ((InetSocketAddress)
socketAddress).getAddress().toString();
+ int port = 0;
+ if (socketAddress instanceof InetSocketAddress
inetSocketAddress) {
+ sender =
inetSocketAddress.getAddress().toString();
+ port = inetSocketAddress.getPort();
}
// create a byte array from the buffer
@@ -133,7 +135,7 @@ public class DatagramChannelDispatcher<E extends
Event<DatagramChannel>> impleme
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes, 0, buffer.limit());
- final Map<String, String> metadata =
EventFactoryUtil.createMapWithSender(sender);
+ final Map<String, String> metadata =
EventFactoryUtil.createMapWithSender(sender, port);
final E event = eventFactory.create(bytes,
metadata, null);
events.offer(event);
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
index 1bd9f0da61..dbfb3391fd 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
@@ -29,6 +29,7 @@ public interface EventFactory<E extends Event> {
* The key in the metadata map for the sender.
*/
String SENDER_KEY = "sender";
+ String SENDER_PORT_KEY = "senderPort";
/**
* Creates an event for the given data and metadata.
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
index 9f27b8c034..9ca758918d 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
@@ -24,9 +24,10 @@ import java.util.Map;
*/
public class EventFactoryUtil {
- public static Map<String, String> createMapWithSender(final String sender)
{
+ public static Map<String, String> createMapWithSender(final String sender,
final int port) {
Map<String, String> metadata = new HashMap<>();
metadata.put(EventFactory.SENDER_KEY, sender);
+ metadata.put(EventFactory.SENDER_PORT_KEY, String.valueOf(port));
return metadata;
}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
index 925cb8f122..cfa62e6691 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
@@ -26,11 +26,17 @@ import java.nio.channels.SelectableChannel;
public class StandardEvent<C extends SelectableChannel> implements Event<C> {
private final String sender;
+ private final String senderPort;
private final byte[] data;
private final ChannelResponder<C> responder;
public StandardEvent(final String sender, final byte[] data, final
ChannelResponder<C> responder) {
+ this(sender, null, data, responder);
+ }
+
+ public StandardEvent(final String sender, final String senderPort, final
byte[] data, final ChannelResponder<C> responder) {
this.sender = sender;
+ this.senderPort = senderPort;
this.data = data;
this.responder = responder;
}
@@ -40,6 +46,10 @@ public class StandardEvent<C extends SelectableChannel>
implements Event<C> {
return sender;
}
+ public String getSenderPort() {
+ return senderPort;
+ }
+
@Override
public byte[] getData() {
return data;
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
index dce31f12e5..a0fc861695 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
@@ -29,10 +29,12 @@ public class StandardEventFactory<T extends Event<?>>
implements EventFactory<St
@Override
public StandardEvent create(final byte[] data, final Map<String, String>
metadata, final ChannelResponder responder) {
String sender = null;
+ String senderPort = null;
if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY))
{
sender = metadata.get(EventFactory.SENDER_KEY);
+ senderPort = metadata.get(EventFactory.SENDER_PORT_KEY);
}
- return new StandardEvent(sender, data, responder);
+ return new StandardEvent(sender, senderPort, data, responder);
}
}
diff --git
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
index 0ffc62fd0a..7171d96595 100644
---
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
+++
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
@@ -73,8 +73,8 @@ public class EventBatcherTest {
public void testGetBatches() throws InterruptedException {
String sender1 = new InetSocketAddress(0).toString();
String sender2 = new InetSocketAddress(2).toString();
- final Map<String, String> sender1Metadata =
EventFactoryUtil.createMapWithSender(sender1);
- final Map<String, String> sender2Metadata =
EventFactoryUtil.createMapWithSender(sender2);
+ final Map<String, String> sender1Metadata =
EventFactoryUtil.createMapWithSender(sender1, 0);
+ final Map<String, String> sender2Metadata =
EventFactoryUtil.createMapWithSender(sender2, 2);
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8),
sender1Metadata));
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8),
sender1Metadata));
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8),
sender1Metadata));
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 64023707bd..25993e965c 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -60,7 +60,8 @@ import java.util.concurrent.BlockingQueue;
"for datagrams from all hosts and ports.")
@WritesAttributes({
@WritesAttribute(attribute = "udp.sender", description = "The sending
host of the messages."),
- @WritesAttribute(attribute = "udp.port", description = "The sending
port the messages were received.")
+ @WritesAttribute(attribute = "udp.sender.port", description = "The
sending port of the messages."),
+ @WritesAttribute(attribute = "udp.port", description = "The listening
port on which the messages were received.")
})
public class ListenUDP extends
AbstractListenEventBatchingProcessor<StandardEvent> {
@@ -87,6 +88,7 @@ public class ListenUDP extends
AbstractListenEventBatchingProcessor<StandardEven
public static final String UDP_PORT_ATTR = "udp.port";
public static final String UDP_SENDER_ATTR = "udp.sender";
+ public static final String UDP_SENDER_PORT_ATTR = "udp.sender.port";
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
@@ -131,9 +133,12 @@ public class ListenUDP extends
AbstractListenEventBatchingProcessor<StandardEven
@Override
protected Map<String, String> getAttributes(final FlowFileEventBatch
batch) {
- final String sender = batch.getEvents().getFirst().getSender();
- final Map<String, String> attributes = new HashMap<>(3);
+ final StandardEvent standardEvent = batch.getEvents().getFirst();
+ final String sender = standardEvent.getSender();
+ final String senderPort = standardEvent.getSenderPort();
+ final Map<String, String> attributes = new HashMap<>(4);
attributes.put(UDP_SENDER_ATTR, sender);
+ attributes.put(UDP_SENDER_PORT_ATTR, senderPort);
attributes.put(UDP_PORT_ATTR, String.valueOf(port));
return attributes;
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index 9a4a542075..9cec642c42 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -136,13 +136,15 @@ public class TestListenUDP {
public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1";
final String sender2 = "sender2";
+ final String senderPort1 = "senderPort1";
+ final String senderPort2 = "senderPort2";
final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
final List<StandardEvent<DatagramChannel>> mockEvents = new
ArrayList<>();
- mockEvents.add(new StandardEvent<>(sender1, message, responder));
- mockEvents.add(new StandardEvent<>(sender1, message, responder));
- mockEvents.add(new StandardEvent<>(sender2, message, responder));
- mockEvents.add(new StandardEvent<>(sender2, message, responder));
+ mockEvents.add(new StandardEvent<>(sender1, senderPort1, message,
responder));
+ mockEvents.add(new StandardEvent<>(sender1, senderPort1, message,
responder));
+ mockEvents.add(new StandardEvent<>(sender2, senderPort2, message,
responder));
+ mockEvents.add(new StandardEvent<>(sender2, senderPort2, message,
responder));
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
@@ -213,6 +215,7 @@ public class TestListenUDP {
flowFile.assertContentEquals("This is message " + (i + 1));
assertEquals(String.valueOf(port),
flowFile.getAttribute(ListenUDP.UDP_PORT_ATTR));
assertTrue(StringUtils.isNotEmpty(flowFile.getAttribute(ListenUDP.UDP_SENDER_ATTR)));
+
assertTrue(StringUtils.isNumeric(flowFile.getAttribute(ListenUDP.UDP_SENDER_PORT_ATTR)));
}
}