Repository: nifi
Updated Branches:
  refs/heads/master 1a6802a01 -> f0856565a


NIFI-2615 polishing
- added missing POM entries to nifi POM and assembly POM
- added available port-discovery to tests amongs other minor polishings


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f0856565
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f0856565
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f0856565

Branch: refs/heads/master
Commit: f0856565ad3979148c82fcef2865340a40ab61f5
Parents: 3626abd
Author: Oleg Zhurakousky <[email protected]>
Authored: Wed Jan 25 13:13:53 2017 -0500
Committer: Oleg Zhurakousky <[email protected]>
Committed: Wed Jan 25 13:22:50 2017 -0500

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |  5 ++
 .../apache/nifi/processors/gettcp/GetTCP.java   |  9 +--
 .../nifi/processors/gettcp/ReceivingClient.java | 25 +--------
 .../processors/gettcp/ReceivingClientTest.java  | 59 +++++++++++---------
 .../apache/nifi/processors/gettcp/Server.java   |  2 +-
 .../nifi/processors/gettcp/TestGetTCP.java      | 44 +++++++++++----
 nifi-nar-bundles/nifi-tcp-bundle/pom.xml        |  1 -
 pom.xml                                         |  6 ++
 8 files changed, 88 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index abe5a58..e015e03 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -418,6 +418,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-websocket-processors-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-tcp-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java
 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java
index c366929..172a4f9 100644
--- 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java
+++ 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/GetTCP.java
@@ -40,6 +40,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
@@ -90,10 +91,10 @@ public class GetTCP extends AbstractSessionFactoryProcessor 
{
     public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
             .name("receive-buffer-size")
             .displayName("Receive Buffer Size")
-            .description("The size of the buffer to receive data in")
+            .description("The size of the buffer to receive data in. Default 
16384 (16MB).")
             .required(false)
-            .defaultValue("2048")
-            .addValidator(StandardValidators.createLongValidator(1, 2048, 
true))
+            .defaultValue("16MB")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor END_OF_MESSAGE_BYTE = new 
PropertyDescriptor.Builder()
@@ -175,7 +176,7 @@ public class GetTCP extends AbstractSessionFactoryProcessor 
{
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws 
ProcessException {
-        this.receiveBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
+        this.receiveBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         this.originalServerAddressList = 
context.getProperty(ENDPOINT_LIST).getValue();
         this.endOfMessageByte = ((byte) 
context.getProperty(END_OF_MESSAGE_BYTE).asInteger().intValue());
         this.connectionAttemptCount = 
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java
 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java
index 94c2d27..2fb7c33 100644
--- 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java
+++ 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/main/java/org/apache/nifi/processors/gettcp/ReceivingClient.java
@@ -29,12 +29,10 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * Implementation of receiving network client.
  */
-public class ReceivingClient extends AbstractSocketHandler {
+class ReceivingClient extends AbstractSocketHandler {
 
     private final ScheduledExecutorService connectionScheduler;
 
-    private volatile InetSocketAddress backupAddress;
-
     private volatile int reconnectAttempts;
 
     private volatile long delayMillisBeforeReconnect;
@@ -48,10 +46,6 @@ public class ReceivingClient extends AbstractSocketHandler {
         this.connectionScheduler = connectionScheduler;
     }
 
-    public void setBackupAddress(InetSocketAddress backupAddress) {
-        this.backupAddress = backupAddress;
-    }
-
     public void setReconnectAttempts(int reconnectAttempts) {
         this.reconnectAttempts = reconnectAttempts;
     }
@@ -89,21 +83,8 @@ public class ReceivingClient extends AbstractSocketHandler {
                         }
                         connectionScheduler.schedule(this, 
delayMillisBeforeReconnect, TimeUnit.MILLISECONDS);
                     } else {
-                        if (backupAddress == null) {
-                            connectionError.set(e);
-                        } else {
-                            try {
-                                if (logger.isInfoEnabled()) {
-                                    logger.info("Every attempt to connect to 
'" + address + "' has failed.");
-                                    logger.info("Attempting to conect to 
secondary endppoint '" + backupAddress + "'.");
-                                }
-                                rootChannel = doConnect(backupAddress);
-                                connectedAddress = backupAddress;
-                            } catch (Exception re) {
-                                logger.error("Failed to connect to secondary 
endpoint.");
-                                connectionError.set(re);
-                            }
-                        }
+                        connectionError.set(e);
+                        logger.error("Failed to connect to secondary 
endpoint.");
                         latch.countDown();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java
 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java
index fa0dd75..4c25c22 100644
--- 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java
+++ 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/ReceivingClientTest.java
@@ -3,10 +3,11 @@ package org.apache.nifi.processors.gettcp;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -17,8 +18,12 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+@Ignore // Ignored for full build due to artificial delays given the
+        // multi-threaded nature of most of the tests. Please un-Ignore and run
+        // when working on changes
 public class ReceivingClientTest {
 
     private final static byte EOM = '\r';
@@ -37,8 +42,9 @@ public class ReceivingClientTest {
 
     @Test
     public void validateSuccessfullConnectionAndCommunication() throws 
Exception {
+        int port = this.availablePort();
         String msgToSend = "Hello from 
validateSuccessfullConnectionAndCommunication";
-        InetSocketAddress address = new InetSocketAddress(9999);
+        InetSocketAddress address = new InetSocketAddress(port);
         Server server = new Server(address, 1024, EOM);
         server.start();
 
@@ -63,8 +69,9 @@ public class ReceivingClientTest {
 
     @Test
     public void 
validateSuccessfullConnectionAndCommunicationWithClientBufferSmallerThenMessage()
 throws Exception {
+        int port = this.availablePort();
         String msgToSend = "Hello from 
validateSuccessfullConnectionAndCommunicationWithClientBufferSmallerThenMessage";
-        InetSocketAddress address = new InetSocketAddress(9999);
+        InetSocketAddress address = new InetSocketAddress(port);
         Server server = new Server(address, 1024, EOM);
         server.start();
 
@@ -88,8 +95,9 @@ public class ReceivingClientTest {
 
     @Test
     public void 
validateMessageSendBeforeAfterClientConnectDisconnectNoEndOfMessageByte() 
throws Exception {
+        int port = this.availablePort();
         String msgToSend = "Hello from 
validateMessageSendBeforeAfterClientConnectDisconnectNoEndOfMessageByte";
-        InetSocketAddress address = new InetSocketAddress(9999);
+        InetSocketAddress address = new InetSocketAddress(port);
         Server server = new Server(address, 1024, EOM);
         server.start();
         this.sendToSocket(address, "foo"); // validates no unexpected errors
@@ -121,15 +129,15 @@ public class ReceivingClientTest {
 
     @Test
     public void validateReconnectDuringReceive() throws Exception {
+        int port = this.availablePort();
         String msgToSend = "Hello from validateReconnectDuringReceive\r";
-        InetSocketAddress addressMain = new InetSocketAddress(9998);
+        InetSocketAddress addressMain = new InetSocketAddress(port);
         Server server = new Server(addressMain, 1024, EOM);
         server.start();
 
         ExecutorService sendingExecutor = Executors.newSingleThreadExecutor();
 
         ReceivingClient client = new ReceivingClient(addressMain, 
this.scheduler, 1024, EOM);
-        client.setBackupAddress(addressMain);
         client.setReconnectAttempts(10);
         client.setDelayMillisBeforeReconnect(1000);
         client.setMessageHandler((fromAddress, message, partialMessage) -> 
System.out.println(new String(message)));
@@ -170,25 +178,6 @@ public class ReceivingClientTest {
         assertFalse(server.isRunning());
     }
 
-    @Test
-    public void validateConnectionFailureAfterRetries() throws Exception {
-        ReceivingClient client = null;
-        try {
-            InetSocketAddress addressMain = new InetSocketAddress(9998);
-            InetSocketAddress addressSecondary = new InetSocketAddress(9999);
-
-            client = new ReceivingClient(addressMain, this.scheduler, 1024, 
EOM);
-            client.setBackupAddress(addressSecondary);
-            client.setReconnectAttempts(5);
-            client.setDelayMillisBeforeReconnect(200);
-            client.start();
-            fail();
-        } catch (Exception e) {
-            assertTrue(e instanceof IllegalStateException);
-        }
-        assertFalse(client.isRunning());
-    }
-
     private void sendToSocket(InetSocketAddress address, String message) 
throws Exception {
         Socket socket = new Socket(address.getAddress(), address.getPort());
         PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
@@ -196,4 +185,24 @@ public class ReceivingClientTest {
         out.flush();
         socket.close();
     }
+
+    /**
+     * Will determine the available port used by test server.
+     */
+    private int availablePort() {
+        ServerSocket s = null;
+        try {
+            s = new ServerSocket(0);
+            s.setReuseAddress(true);
+            return s.getLocalPort();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to discover available 
port.", e);
+        } finally {
+            try {
+                s.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java
 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java
index 550d2cb..6c38ad3 100644
--- 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java
+++ 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/Server.java
@@ -29,7 +29,7 @@ public class Server extends AbstractSocketHandler {
 
     public static void main(String[] args) throws Exception {
         InetSocketAddress address = new InetSocketAddress(9999);
-        Server server = new Server(address, 4096, (byte) '\r');
+        Server server = new Server(address, 4096, (byte) '\n');
         server.start();
         System.in.read();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java
 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java
index 60868d0..bcc4092 100644
--- 
a/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java
+++ 
b/nifi-nar-bundles/nifi-tcp-bundle/nifi-tcp-processors/src/test/java/org/apache/nifi/processors/gettcp/TestGetTCP.java
@@ -17,8 +17,10 @@
 
 package org.apache.nifi.processors.gettcp;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 
 import org.apache.nifi.util.TestRunner;
@@ -71,10 +73,11 @@ public final class TestGetTCP {
 
     @Test
     public void testSuccessInteraction() throws Exception {
-        Server server = setupTCPServer(9999);
-        testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + 9999);
+        int port = this.availablePort();
+        Server server = setupTCPServer(port);
+        testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + port);
         testRunner.run(1000, false);
-        this.sendToSocket(new InetSocketAddress(9999), "Hello\r");
+        this.sendToSocket(new InetSocketAddress(port), "Hello\r");
         Thread.sleep(200);
         testRunner.assertAllFlowFilesTransferred(GetTCP.REL_SUCCESS, 1);
         testRunner.clearTransferState();
@@ -84,17 +87,18 @@ public final class TestGetTCP {
 
     @Test
     public void testPartialInteraction() throws Exception {
-        Server server = setupTCPServer(9999);
-        testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + 9999);
-        testRunner.setProperty(GetTCP.RECEIVE_BUFFER_SIZE, "2");
+        int port = this.availablePort();
+        Server server = setupTCPServer(port);
+        testRunner.setProperty(GetTCP.ENDPOINT_LIST, "localhost:" + port);
+        testRunner.setProperty(GetTCP.RECEIVE_BUFFER_SIZE, "2B");
         testRunner.run(1000, false);
-        this.sendToSocket(new InetSocketAddress(9999), "Hello\r");
-        Thread.sleep(200);
+        this.sendToSocket(new InetSocketAddress(port), "Hello\r");
+        Thread.sleep(300);
         testRunner.assertAllFlowFilesTransferred(GetTCP.REL_PARTIAL, 3);
         testRunner.clearTransferState();
 
-        this.sendToSocket(new InetSocketAddress(9999), "H\r");
-        Thread.sleep(200);
+        this.sendToSocket(new InetSocketAddress(port), "H\r");
+        Thread.sleep(300);
         testRunner.assertAllFlowFilesTransferred(GetTCP.REL_SUCCESS, 1);
         testRunner.clearTransferState();
         testRunner.shutdown();
@@ -115,4 +119,24 @@ public final class TestGetTCP {
         out.flush();
         socket.close();
     }
+
+    /**
+     * Will determine the available port used by test server.
+     */
+    private int availablePort() {
+        ServerSocket s = null;
+        try {
+            s = new ServerSocket(0);
+            s.setReuseAddress(true);
+            return s.getLocalPort();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to discover available 
port.", e);
+        } finally {
+            try {
+                s.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/nifi-nar-bundles/nifi-tcp-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-tcp-bundle/pom.xml 
b/nifi-nar-bundles/nifi-tcp-bundle/pom.xml
index bf8821d..f4ec29b 100644
--- a/nifi-nar-bundles/nifi-tcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-tcp-bundle/pom.xml
@@ -24,7 +24,6 @@
 
 
     <artifactId>nifi-tcp-bundle</artifactId>
-    <version>1.2.0-SNAPSHOT</version>
     <packaging>pom</packaging>
 
     <modules>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f0856565/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 59a6e14..0fb3d1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1221,6 +1221,12 @@ language governing permissions and limitations under the 
License. -->
                 <version>1.2.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
+             <dependency>
+                 <groupId>org.apache.nifi</groupId>
+                 <artifactId>nifi-tcp-nar</artifactId>
+                 <version>1.2.0-SNAPSHOT</version>
+                 <type>nar</type>
+             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-splunk-nar</artifactId>

Reply via email to