This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 00f385b  NIFI-8789 Corrected TestListenUDP to use getAvailableUdpPort()
00f385b is described below

commit 00f385b51bd4fda06269881ef7a64b4e59c68227
Author: exceptionfactory <[email protected]>
AuthorDate: Thu Jul 15 20:31:02 2021 -0500

    NIFI-8789 Corrected TestListenUDP to use getAvailableUdpPort()
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5219.
---
 .../nifi/processors/standard/TestListenUDP.java    | 66 ++++++++++------------
 1 file changed, 31 insertions(+), 35 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index bbbce1f..3fcfb4c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -31,17 +31,22 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
+import java.nio.channels.DatagramChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
+@RunWith(MockitoJUnitRunner.class)
 public class TestListenUDP {
 
     private static final String LOCALHOST = "localhost";
@@ -50,10 +55,13 @@ public class TestListenUDP {
 
     private TestRunner runner;
 
+    @Mock
+    private ChannelResponder<DatagramChannel> responder;
+
     @Before
     public void setUp() throws Exception {
         runner = TestRunners.newTestRunner(ListenUDP.class);
-        port = NetworkUtils.availablePort();
+        port = NetworkUtils.getAvailableUdpPort();
         runner.setProperty(ListenUDP.PORT, Integer.toString(port));
     }
 
@@ -62,7 +70,7 @@ public class TestListenUDP {
         runner.setProperty(ListenUDP.PORT, "1");
         runner.assertValid();
 
-        runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
+        runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
         runner.assertNotValid();
 
         runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
@@ -75,12 +83,9 @@ public class TestListenUDP {
     @Test
     public void testDefaultBehavior() throws IOException, InterruptedException 
{
         final List<String> messages = getMessages(15);
-        final int expectedQueued = messages.size();
         final int expectedTransferred = messages.size();
 
-        // default behavior should produce a FlowFile per message sent
-
-        run(new DatagramSocket(), messages, expectedQueued, 
expectedTransferred);
+        run(new DatagramSocket(), messages, expectedTransferred);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 
messages.size());
 
         List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -95,7 +100,7 @@ public class TestListenUDP {
 
         final List<String> messages = getMessages(20);
 
-        run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
+        run(new DatagramSocket(), messages, maxQueueSize);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 
maxQueueSize);
 
         List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -110,10 +115,9 @@ public class TestListenUDP {
         runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
 
         final List<String> messages = getMessages(5);
-        final int expectedQueued = messages.size();
         final int expectedTransferred = 2;
 
-        run(new DatagramSocket(), messages, expectedQueued, 
expectedTransferred);
+        run(new DatagramSocket(), messages, expectedTransferred);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 
expectedTransferred);
 
         List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -131,55 +135,52 @@ public class TestListenUDP {
     public void testBatchingWithDifferentSenders() {
         final String sender1 = "sender1";
         final String sender2 = "sender2";
-        final ChannelResponder responder = 
Mockito.mock(ChannelResponder.class);
         final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
 
-        final List<StandardEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new StandardEvent(sender1, message, responder));
-        mockEvents.add(new StandardEvent(sender1, message, responder));
-        mockEvents.add(new StandardEvent(sender2, message, responder));
-        mockEvents.add(new StandardEvent(sender2, message, responder));
+        final List<StandardEvent<DatagramChannel>> mockEvents = new 
ArrayList<>();
+        mockEvents.add(new StandardEvent<>(sender1, message, responder));
+        mockEvents.add(new StandardEvent<>(sender1, message, responder));
+        mockEvents.add(new StandardEvent<>(sender2, message, responder));
+        mockEvents.add(new StandardEvent<>(sender2, message, responder));
 
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
-        runner.setProperty(ListenRELP.PORT, "1");
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(ListenUDP.PORT, "1");
+        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
 
         // sending 4 messages with a batch size of 10, but should get 2 
FlowFiles because of different senders
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
 
         verifyProvenance(2);
     }
 
     @Test
     public void testRunWhenNoEventsAvailable() {
-        final List<StandardEvent> mockEvents = new ArrayList<>();
+        final List<StandardEvent<DatagramChannel>> mockEvents = new 
ArrayList<>();
 
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
-        runner.setProperty(ListenRELP.PORT, "1");
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(ListenUDP.PORT, "1");
+        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
 
         runner.run(5);
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
+        runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
     }
 
     @Test
     public void testWithSendingHostAndPortSameAsSender() throws IOException, 
InterruptedException {
-        final String sendingHost = "localhost";
-        final Integer sendingPort = 21001;
-        runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
+        final Integer sendingPort = NetworkUtils.getAvailableUdpPort();
+        runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
         runner.setProperty(ListenUDP.SENDING_HOST_PORT, 
String.valueOf(sendingPort));
 
         // bind to the same sending port that processor has for Sending Host 
Port
         final DatagramSocket socket = new DatagramSocket(sendingPort);
 
         final List<String> messages = getMessages(6);
-        final int expectedQueued = messages.size();
         final int expectedTransferred = messages.size();
 
-        run(socket, messages, expectedQueued, expectedTransferred);
+        run(socket, messages, expectedTransferred);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 
messages.size());
 
         List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@@ -214,11 +215,8 @@ public class TestListenUDP {
         }
     }
 
-    protected void run(final DatagramSocket socket, final List<String> 
messages, final int expectedQueueSize, final int expectedTransferred)
+    protected void run(final DatagramSocket socket, final List<String> 
messages, final int expectedTransferred)
             throws IOException, InterruptedException {
-
-
-
         // Run Processor and start Dispatcher without shutting down
         runner.run(1, false, true);
 
@@ -242,9 +240,9 @@ public class TestListenUDP {
     // Extend ListenUDP to mock the ChannelDispatcher and allow us to return 
staged events
     private static class MockListenUDP extends ListenUDP {
 
-        private List<StandardEvent> mockEvents;
+        private final List<StandardEvent<DatagramChannel>> mockEvents;
 
-        public MockListenUDP(List<StandardEvent> mockEvents) {
+        public MockListenUDP(List<StandardEvent<DatagramChannel>> mockEvents) {
             this.mockEvents = mockEvents;
         }
 
@@ -259,7 +257,5 @@ public class TestListenUDP {
         protected ChannelDispatcher createDispatcher(ProcessContext context, 
BlockingQueue<StandardEvent> events) {
             return Mockito.mock(ChannelDispatcher.class);
         }
-
     }
-
 }

Reply via email to