This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0024b1ddbd NIFI-15441 Added udp.sender.port FlowFile Attribute to 
ListenUDP (#10743)
0024b1ddbd is described below

commit 0024b1ddbde1dd795b33477036f83aca8d8738a6
Author: greyp9 <[email protected]>
AuthorDate: Thu Jan 15 12:32:03 2026 -0500

    NIFI-15441 Added udp.sender.port FlowFile Attribute to ListenUDP (#10743)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../util/listen/dispatcher/DatagramChannelDispatcher.java     |  8 +++++---
 .../apache/nifi/processor/util/listen/event/EventFactory.java |  1 +
 .../nifi/processor/util/listen/event/EventFactoryUtil.java    |  3 ++-
 .../nifi/processor/util/listen/event/StandardEvent.java       | 10 ++++++++++
 .../processor/util/listen/event/StandardEventFactory.java     |  4 +++-
 .../apache/nifi/processor/util/listen/EventBatcherTest.java   |  4 ++--
 .../java/org/apache/nifi/processors/standard/ListenUDP.java   | 11 ++++++++---
 .../org/apache/nifi/processors/standard/TestListenUDP.java    | 11 +++++++----
 8 files changed, 38 insertions(+), 14 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
index 7e565aaabf..7b7306b3c4 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
@@ -124,8 +124,10 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
                         buffer.clear();
                         while (!stopped && (socketAddress = 
channel.receive(buffer)) != null) {
                             String sender = "";
-                            if (socketAddress instanceof InetSocketAddress) {
-                                sender = ((InetSocketAddress) 
socketAddress).getAddress().toString();
+                            int port = 0;
+                            if (socketAddress instanceof InetSocketAddress 
inetSocketAddress) {
+                                sender = 
inetSocketAddress.getAddress().toString();
+                                port = inetSocketAddress.getPort();
                             }
 
                             // create a byte array from the buffer
@@ -133,7 +135,7 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
                             byte[] bytes = new byte[buffer.limit()];
                             buffer.get(bytes, 0, buffer.limit());
 
-                            final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender);
+                            final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender, port);
                             final E event = eventFactory.create(bytes, 
metadata, null);
                             events.offer(event);
 
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
index 1bd9f0da61..dbfb3391fd 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
@@ -29,6 +29,7 @@ public interface EventFactory<E extends Event> {
      * The key in the metadata map for the sender.
      */
     String SENDER_KEY = "sender";
+    String SENDER_PORT_KEY = "senderPort";
 
     /**
      * Creates an event for the given data and metadata.
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
index 9f27b8c034..9ca758918d 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
@@ -24,9 +24,10 @@ import java.util.Map;
  */
 public class EventFactoryUtil {
 
-    public static Map<String, String> createMapWithSender(final String sender) 
{
+    public static Map<String, String> createMapWithSender(final String sender, 
final int port) {
         Map<String, String> metadata = new HashMap<>();
         metadata.put(EventFactory.SENDER_KEY, sender);
+        metadata.put(EventFactory.SENDER_PORT_KEY, String.valueOf(port));
         return metadata;
     }
 
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
index 925cb8f122..cfa62e6691 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
@@ -26,11 +26,17 @@ import java.nio.channels.SelectableChannel;
 public class StandardEvent<C extends SelectableChannel> implements Event<C> {
 
     private final String sender;
+    private final String senderPort;
     private final byte[] data;
     private final ChannelResponder<C> responder;
 
     public StandardEvent(final String sender, final byte[] data, final 
ChannelResponder<C> responder) {
+        this(sender, null, data, responder);
+    }
+
+    public StandardEvent(final String sender, final String senderPort, final 
byte[] data, final ChannelResponder<C> responder) {
         this.sender = sender;
+        this.senderPort = senderPort;
         this.data = data;
         this.responder = responder;
     }
@@ -40,6 +46,10 @@ public class StandardEvent<C extends SelectableChannel> 
implements Event<C> {
         return sender;
     }
 
+    public String getSenderPort() {
+        return senderPort;
+    }
+
     @Override
     public byte[] getData() {
         return data;
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
index dce31f12e5..a0fc861695 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
@@ -29,10 +29,12 @@ public class StandardEventFactory<T extends Event<?>> 
implements EventFactory<St
     @Override
     public StandardEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
         String sender = null;
+        String senderPort = null;
         if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) 
{
             sender = metadata.get(EventFactory.SENDER_KEY);
+            senderPort = metadata.get(EventFactory.SENDER_PORT_KEY);
         }
-        return new StandardEvent(sender, data, responder);
+        return new StandardEvent(sender, senderPort, data, responder);
     }
 
 }
diff --git 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
index 0ffc62fd0a..7171d96595 100644
--- 
a/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
+++ 
b/nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
@@ -73,8 +73,8 @@ public class EventBatcherTest {
     public void testGetBatches() throws InterruptedException {
         String sender1 = new InetSocketAddress(0).toString();
         String sender2 = new InetSocketAddress(2).toString();
-        final Map<String, String> sender1Metadata = 
EventFactoryUtil.createMapWithSender(sender1);
-        final Map<String, String> sender2Metadata = 
EventFactoryUtil.createMapWithSender(sender2);
+        final Map<String, String> sender1Metadata = 
EventFactoryUtil.createMapWithSender(sender1, 0);
+        final Map<String, String> sender2Metadata = 
EventFactoryUtil.createMapWithSender(sender2, 2);
         
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), 
sender1Metadata));
         
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), 
sender1Metadata));
         
events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), 
sender1Metadata));
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index 64023707bd..25993e965c 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -60,7 +60,8 @@ import java.util.concurrent.BlockingQueue;
         "for datagrams from all hosts and ports.")
 @WritesAttributes({
         @WritesAttribute(attribute = "udp.sender", description = "The sending 
host of the messages."),
-        @WritesAttribute(attribute = "udp.port", description = "The sending 
port the messages were received.")
+        @WritesAttribute(attribute = "udp.sender.port", description = "The 
sending port of the messages."),
+        @WritesAttribute(attribute = "udp.port", description = "The listening 
port on which the messages were received.")
 })
 public class ListenUDP extends 
AbstractListenEventBatchingProcessor<StandardEvent> {
 
@@ -87,6 +88,7 @@ public class ListenUDP extends 
AbstractListenEventBatchingProcessor<StandardEven
 
     public static final String UDP_PORT_ATTR = "udp.port";
     public static final String UDP_SENDER_ATTR = "udp.sender";
+    public static final String UDP_SENDER_PORT_ATTR = "udp.sender.port";
 
     @Override
     protected List<PropertyDescriptor> getAdditionalProperties() {
@@ -131,9 +133,12 @@ public class ListenUDP extends 
AbstractListenEventBatchingProcessor<StandardEven
 
     @Override
     protected Map<String, String> getAttributes(final FlowFileEventBatch 
batch) {
-        final String sender = batch.getEvents().getFirst().getSender();
-        final Map<String, String> attributes = new HashMap<>(3);
+        final StandardEvent standardEvent = batch.getEvents().getFirst();
+        final String sender = standardEvent.getSender();
+        final String senderPort = standardEvent.getSenderPort();
+        final Map<String, String> attributes = new HashMap<>(4);
         attributes.put(UDP_SENDER_ATTR, sender);
+        attributes.put(UDP_SENDER_PORT_ATTR, senderPort);
         attributes.put(UDP_PORT_ATTR, String.valueOf(port));
         return attributes;
     }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index 9a4a542075..9cec642c42 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -136,13 +136,15 @@ public class TestListenUDP {
     public void testBatchingWithDifferentSenders() {
         final String sender1 = "sender1";
         final String sender2 = "sender2";
+        final String senderPort1 = "senderPort1";
+        final String senderPort2 = "senderPort2";
         final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
 
         final List<StandardEvent<DatagramChannel>> mockEvents = new 
ArrayList<>();
-        mockEvents.add(new StandardEvent<>(sender1, message, responder));
-        mockEvents.add(new StandardEvent<>(sender1, message, responder));
-        mockEvents.add(new StandardEvent<>(sender2, message, responder));
-        mockEvents.add(new StandardEvent<>(sender2, message, responder));
+        mockEvents.add(new StandardEvent<>(sender1, senderPort1, message, 
responder));
+        mockEvents.add(new StandardEvent<>(sender1, senderPort1, message, 
responder));
+        mockEvents.add(new StandardEvent<>(sender2, senderPort2, message, 
responder));
+        mockEvents.add(new StandardEvent<>(sender2, senderPort2, message, 
responder));
 
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
@@ -213,6 +215,7 @@ public class TestListenUDP {
             flowFile.assertContentEquals("This is message " + (i + 1));
             assertEquals(String.valueOf(port), 
flowFile.getAttribute(ListenUDP.UDP_PORT_ATTR));
             
assertTrue(StringUtils.isNotEmpty(flowFile.getAttribute(ListenUDP.UDP_SENDER_ATTR)));
+            
assertTrue(StringUtils.isNumeric(flowFile.getAttribute(ListenUDP.UDP_SENDER_PORT_ATTR)));
         }
     }
 

Reply via email to