Repository: nifi
Updated Branches:
  refs/heads/master 7e61c6333 -> a774f1df6


NIFI-4673 changed offending tests to be integration tests and fixed travis 
config to run the build only once and during appropriate script phase instead 
of install. Reviewed by Bende.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a774f1df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a774f1df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a774f1df

Branch: refs/heads/master
Commit: a774f1df69b1d7fd6c3fda2acf7e0adf883def25
Parents: 7e61c63
Author: joewitt <[email protected]>
Authored: Wed Dec 6 12:03:16 2017 -0500
Committer: joewitt <[email protected]>
Committed: Wed Dec 6 16:13:42 2017 -0500

----------------------------------------------------------------------
 .travis.yml                                     |   7 +-
 .../standard/ITListenAndPutSyslog.java          | 175 +++++++++++
 .../standard/TestListenAndPutSyslog.java        | 175 -----------
 .../jetty/ITJettyWebSocketCommunication.java    | 306 +++++++++++++++++++
 .../ITJettyWebSocketSecureCommunication.java    |  68 +++++
 .../jetty/TestJettyWebSocketCommunication.java  | 306 -------------------
 .../TestJettyWebSocketSecureCommunication.java  |  68 -----
 7 files changed, 554 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index e5b0ccb..6bb35a1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,6 +31,7 @@ cache:
   directories:
     - $HOME/.m2
     - $HOME/.npm
+
 before_cache:
   # Remove nifi repo again to save travis from caching it
   - rm -rf $HOME/.m2/repository/org/apache/nifi/
@@ -47,9 +48,11 @@ before_install:
   # Remove nifi repo again to save travis from caching it
   - rm -rf $HOME/.m2/repository/org/apache/nifi/
 
-install:
+install: true
+    
+script:
   # Replace variables seems to be the only option to pass proper values to 
surefire
   # Note: The reason the sed is done as part of script is to ensure the pom 
hack 
   # won't affect the 'clean install' above
   - bash .travis.sh
-  - mvn -T 2C -Pcontrib-check -Ddir-only clean install
+  - mvn -T 2 clean install -Pcontrib-check -Ddir-only

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
new file mode 100644
index 0000000..5d0562d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITListenAndPutSyslog.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog 
server forwarding
+ * to ListenSyslog, or PutSyslog sending to a syslog server.
+ */
+public class ITListenAndPutSyslog {
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(ITListenAndPutSyslog.class);
+
+    private ListenSyslog listenSyslog;
+    private TestRunner listenSyslogRunner;
+
+    private PutSyslog putSyslog;
+    private TestRunner putSyslogRunner;
+
+    @Before
+    public void setup() {
+        this.listenSyslog = new ListenSyslog();
+        this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
+
+        this.putSyslog = new PutSyslog();
+        this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
+    }
+
+    @After
+    public void teardown() {
+        try {
+            putSyslog.onStopped();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+        try {
+            listenSyslog.onUnscheduled();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+    }
+
+    @Test
+    public void testUDP() throws IOException, InterruptedException {
+        run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
+    }
+
+    @Test
+    public void testTCP() throws IOException, InterruptedException {
+        run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
+    }
+
+    @Test
+    public void testTLS() throws InitializationException, IOException, 
InterruptedException {
+        configureSSLContextService(listenSyslogRunner);
+        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, 
"ssl-context");
+
+        configureSSLContextService(putSyslogRunner);
+        putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, 
"ssl-context");
+
+        run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
+    }
+
+    @Test
+    public void testTLSListenerNoTLSPut() throws InitializationException, 
IOException, InterruptedException {
+        configureSSLContextService(listenSyslogRunner);
+        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, 
"ssl-context");
+
+        // send 7 but expect 0 because sender didn't use TLS
+        run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
+    }
+
+    private SSLContextService configureSSLContextService(TestRunner runner) 
throws InitializationException {
+        final SSLContextService sslContextService = new 
StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+        runner.enableControllerService(sslContextService);
+        return sslContextService;
+    }
+
+    /**
+     * Sends numMessages from PutSyslog to ListenSyslog.
+     */
+    private void run(String protocol, int numMessages, int expectedMessages) 
throws IOException, InterruptedException {
+        // set the same protocol on both processors
+        putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
+        listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
+
+        // set a listening port of 0 to get a random available port
+        listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
+
+        // call onScheduled to start ListenSyslog listening
+        final ProcessSessionFactory processSessionFactory = 
listenSyslogRunner.getProcessSessionFactory();
+        final ProcessContext context = listenSyslogRunner.getProcessContext();
+        listenSyslog.onScheduled(context);
+
+        // get the real port it is listening on and set that in PutSyslog
+        final int listeningPort = listenSyslog.getPort();
+        putSyslogRunner.setProperty(PutSyslog.PORT, 
String.valueOf(listeningPort));
+
+        // configure the message properties on PutSyslog
+        final String pri = "34";
+        final String version = "1";
+        final String stamp = "2016-02-05T22:14:15.003Z";
+        final String host = "localhost";
+        final String body = "some message";
+        final String expectedMessage = "<" + pri + ">" + version + " " + stamp 
+ " " + host + " " + body;
+
+        putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
+        putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
+        putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
+        putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
+        putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
+
+        // send the messages
+        for (int i=0; i < numMessages; i++) {
+            putSyslogRunner.enqueue("incoming 
data".getBytes(Charset.forName("UTF-8")));
+        }
+        putSyslogRunner.run(numMessages, false);
+
+        // trigger ListenSyslog until we've seen all the messages
+        int numTransfered = 0;
+        long timeout = System.currentTimeMillis() + 30000;
+
+        while (numTransfered < expectedMessages && System.currentTimeMillis() 
< timeout) {
+            Thread.sleep(10);
+            listenSyslog.onTrigger(context, processSessionFactory);
+            numTransfered = 
listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
+        }
+        Assert.assertEquals("Did not process all the messages", 
expectedMessages, numTransfered);
+
+        if (expectedMessages > 0) {
+            // check that one of flow files has the expected content
+            MockFlowFile mockFlowFile = 
listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
+            mockFlowFile.assertContentEquals(expectedMessage);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
deleted file mode 100644
index 29fa690..0000000
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenAndPutSyslog.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-/**
- * Tests PutSyslog sending messages to ListenSyslog to simulate a syslog 
server forwarding
- * to ListenSyslog, or PutSyslog sending to a syslog server.
- */
-public class TestListenAndPutSyslog {
-
-    static final Logger LOGGER = 
LoggerFactory.getLogger(TestListenAndPutSyslog.class);
-
-    private ListenSyslog listenSyslog;
-    private TestRunner listenSyslogRunner;
-
-    private PutSyslog putSyslog;
-    private TestRunner putSyslogRunner;
-
-    @Before
-    public void setup() {
-        this.listenSyslog = new ListenSyslog();
-        this.listenSyslogRunner = TestRunners.newTestRunner(listenSyslog);
-
-        this.putSyslog = new PutSyslog();
-        this.putSyslogRunner = TestRunners.newTestRunner(putSyslog);
-    }
-
-    @After
-    public void teardown() {
-        try {
-            putSyslog.onStopped();
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
-        }
-        try {
-            listenSyslog.onUnscheduled();
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
-        }
-    }
-
-    @Test
-    public void testUDP() throws IOException, InterruptedException {
-        run(ListenSyslog.UDP_VALUE.getValue(), 5, 5);
-    }
-
-    @Test
-    public void testTCP() throws IOException, InterruptedException {
-        run(ListenSyslog.TCP_VALUE.getValue(), 5, 5);
-    }
-
-    @Test
-    public void testTLS() throws InitializationException, IOException, 
InterruptedException {
-        configureSSLContextService(listenSyslogRunner);
-        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, 
"ssl-context");
-
-        configureSSLContextService(putSyslogRunner);
-        putSyslogRunner.setProperty(PutSyslog.SSL_CONTEXT_SERVICE, 
"ssl-context");
-
-        run(ListenSyslog.TCP_VALUE.getValue(), 7, 7);
-    }
-
-    @Test
-    public void testTLSListenerNoTLSPut() throws InitializationException, 
IOException, InterruptedException {
-        configureSSLContextService(listenSyslogRunner);
-        listenSyslogRunner.setProperty(ListenSyslog.SSL_CONTEXT_SERVICE, 
"ssl-context");
-
-        // send 7 but expect 0 because sender didn't use TLS
-        run(ListenSyslog.TCP_VALUE.getValue(), 7, 0);
-    }
-
-    private SSLContextService configureSSLContextService(TestRunner runner) 
throws InitializationException {
-        final SSLContextService sslContextService = new 
StandardSSLContextService();
-        runner.addControllerService("ssl-context", sslContextService);
-        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
-        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
-        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
-        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
-        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE_TYPE, "JKS");
-        runner.enableControllerService(sslContextService);
-        return sslContextService;
-    }
-
-    /**
-     * Sends numMessages from PutSyslog to ListenSyslog.
-     */
-    private void run(String protocol, int numMessages, int expectedMessages) 
throws IOException, InterruptedException {
-        // set the same protocol on both processors
-        putSyslogRunner.setProperty(PutSyslog.PROTOCOL, protocol);
-        listenSyslogRunner.setProperty(ListenSyslog.PROTOCOL, protocol);
-
-        // set a listening port of 0 to get a random available port
-        listenSyslogRunner.setProperty(ListenSyslog.PORT, "0");
-
-        // call onScheduled to start ListenSyslog listening
-        final ProcessSessionFactory processSessionFactory = 
listenSyslogRunner.getProcessSessionFactory();
-        final ProcessContext context = listenSyslogRunner.getProcessContext();
-        listenSyslog.onScheduled(context);
-
-        // get the real port it is listening on and set that in PutSyslog
-        final int listeningPort = listenSyslog.getPort();
-        putSyslogRunner.setProperty(PutSyslog.PORT, 
String.valueOf(listeningPort));
-
-        // configure the message properties on PutSyslog
-        final String pri = "34";
-        final String version = "1";
-        final String stamp = "2016-02-05T22:14:15.003Z";
-        final String host = "localhost";
-        final String body = "some message";
-        final String expectedMessage = "<" + pri + ">" + version + " " + stamp 
+ " " + host + " " + body;
-
-        putSyslogRunner.setProperty(PutSyslog.MSG_PRIORITY, pri);
-        putSyslogRunner.setProperty(PutSyslog.MSG_VERSION, version);
-        putSyslogRunner.setProperty(PutSyslog.MSG_TIMESTAMP, stamp);
-        putSyslogRunner.setProperty(PutSyslog.MSG_HOSTNAME, host);
-        putSyslogRunner.setProperty(PutSyslog.MSG_BODY, body);
-
-        // send the messages
-        for (int i=0; i < numMessages; i++) {
-            putSyslogRunner.enqueue("incoming 
data".getBytes(Charset.forName("UTF-8")));
-        }
-        putSyslogRunner.run(numMessages, false);
-
-        // trigger ListenSyslog until we've seen all the messages
-        int numTransfered = 0;
-        long timeout = System.currentTimeMillis() + 30000;
-
-        while (numTransfered < expectedMessages && System.currentTimeMillis() 
< timeout) {
-            Thread.sleep(10);
-            listenSyslog.onTrigger(context, processSessionFactory);
-            numTransfered = 
listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).size();
-        }
-        Assert.assertEquals("Did not process all the messages", 
expectedMessages, numTransfered);
-
-        if (expectedMessages > 0) {
-            // check that one of flow files has the expected content
-            MockFlowFile mockFlowFile = 
listenSyslogRunner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            mockFlowFile.assertContentEquals(expectedMessage);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
new file mode 100644
index 0000000..6d3f063
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketCommunication.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.websocket.BinaryMessageConsumer;
+import org.apache.nifi.websocket.ConnectedListener;
+import org.apache.nifi.websocket.TextMessageConsumer;
+import org.apache.nifi.websocket.WebSocketClientService;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.apache.nifi.websocket.WebSocketSessionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class ITJettyWebSocketCommunication {
+
+    protected int serverPort;
+    protected String serverPath = "/test";
+    protected WebSocketServerService serverService;
+    protected ControllerServiceTestContext serverServiceContext;
+    protected WebSocketClientService clientService;
+    protected ControllerServiceTestContext clientServiceContext;
+
+    protected boolean isSecure() {
+        return false;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        setupServer();
+
+        setupClient();
+    }
+
+    private void setupServer() throws Exception {
+        // Find an open port.
+        try (final ServerSocket serverSocket = new ServerSocket(0)) {
+            serverPort = serverSocket.getLocalPort();
+        }
+        serverService = new JettyWebSocketServer();
+        serverServiceContext = new ControllerServiceTestContext(serverService, 
"JettyWebSocketServer1");
+        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, 
String.valueOf(serverPort));
+
+        customizeServer();
+
+        
serverService.initialize(serverServiceContext.getInitializationContext());
+        
serverService.startServer(serverServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeServer() {
+    }
+
+    private void setupClient() throws Exception {
+        clientService = new JettyWebSocketClient();
+        clientServiceContext = new ControllerServiceTestContext(clientService, 
"JettyWebSocketClient1");
+        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, 
(isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
+
+        customizeClient();
+
+        
clientService.initialize(clientServiceContext.getInitializationContext());
+        
clientService.startClient(clientServiceContext.getConfigurationContext());
+    }
+
+    protected void customizeClient() {
+    }
+
+    @After
+    public void teardown() throws Exception {
+        clientService.stopClient();
+        serverService.stopServer();
+    }
+
+    protected interface MockWebSocketProcessor extends Processor, 
ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
+    }
+
+    private boolean isWindowsEnvironment() {
+        return 
System.getProperty("os.name").toLowerCase().startsWith("windows");
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        assumeFalse(isWindowsEnvironment());
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new 
CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new 
CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new 
CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new 
CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = 
mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new 
AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, 
serverSessionIdRef, invocation))
+                
.when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> 
assertConsumeTextMessage(serverReceivedTextMessageFromClient, 
textMessageFromClient, invocation))
+                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> 
assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, 
textMessageFromClient, invocation))
+                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = 
mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new 
AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, 
clientSessionIdRef, invocation))
+                
.when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> 
assertConsumeTextMessage(clientReceivedTextMessageFromServer, 
textMessageFromServer, invocation))
+                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> 
assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, 
textMessageFromServer, invocation))
+                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", 
clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", 
serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+
+        assertTrue("WebSocket server should be able to consume text message.", 
serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary 
message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", 
clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary 
message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    @Test
+    public void testClientServerCommunicationRecovery() throws Exception {
+        assumeFalse(isWindowsEnvironment());
+        // Expectations.
+        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
+        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
+        final CountDownLatch serverReceivedTextMessageFromClient = new 
CountDownLatch(1);
+        final CountDownLatch serverReceivedBinaryMessageFromClient = new 
CountDownLatch(1);
+        final CountDownLatch clientReceivedTextMessageFromServer = new 
CountDownLatch(1);
+        final CountDownLatch clientReceivedBinaryMessageFromServer = new 
CountDownLatch(1);
+
+        final String textMessageFromClient = "Message from client.";
+        final String textMessageFromServer = "Message from server.";
+
+        final MockWebSocketProcessor serverProcessor = 
mock(MockWebSocketProcessor.class);
+        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
+        final AtomicReference<String> serverSessionIdRef = new 
AtomicReference<>();
+
+        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, 
serverSessionIdRef, invocation))
+                
.when(serverProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> 
assertConsumeTextMessage(serverReceivedTextMessageFromClient, 
textMessageFromClient, invocation))
+                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> 
assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, 
textMessageFromClient, invocation))
+                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
+
+        serverService.registerProcessor(serverPath, serverProcessor);
+
+        final String clientId = "client1";
+
+        final MockWebSocketProcessor clientProcessor = 
mock(MockWebSocketProcessor.class);
+        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
+        final AtomicReference<String> clientSessionIdRef = new 
AtomicReference<>();
+
+
+        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, 
clientSessionIdRef, invocation))
+                
.when(clientProcessor).connected(any(WebSocketSessionInfo.class));
+
+        doAnswer(invocation -> 
assertConsumeTextMessage(clientReceivedTextMessageFromServer, 
textMessageFromServer, invocation))
+                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
+
+        doAnswer(invocation -> 
assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, 
textMessageFromServer, invocation))
+                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
+
+        clientService.registerProcessor(clientId, clientProcessor);
+
+        clientService.connect(clientId);
+
+        assertTrue("WebSocket client should be able to fire connected event.", 
clientConnectedServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to fire connected event.", 
serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
+
+        // Nothing happens if maintenance is executed while sessions are alive.
+        ((JettyWebSocketClient) clientService).maintainSessions();
+
+        // Restart server.
+        serverService.stopServer();
+        
serverService.startServer(serverServiceContext.getConfigurationContext());
+
+        // Sessions will be recreated with the same session ids.
+        ((JettyWebSocketClient) clientService).maintainSessions();
+
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromClient));
+        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
+
+        assertTrue("WebSocket server should be able to consume text message.", 
serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket server should be able to consume binary 
message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
+
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromServer));
+        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
+
+        assertTrue("WebSocket client should be able to consume text message.", 
clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
+        assertTrue("WebSocket client should be able to consume binary 
message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
+
+        clientService.deregisterProcessor(clientId, clientProcessor);
+        serverService.deregisterProcessor(serverPath, serverProcessor);
+    }
+
+    protected Object assertConnectedEvent(CountDownLatch latch, 
AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+        sessionIdRef.set(sessionInfo.getSessionId());
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeTextMessage(CountDownLatch latch, String 
expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final String receivedMessage = invocation.getArgumentAt(1, 
String.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedMessage, receivedMessage);
+        latch.countDown();
+        return null;
+    }
+
+    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String 
expectedMessage, InvocationOnMock invocation) {
+        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
+        assertNotNull(sessionInfo.getLocalAddress());
+        assertNotNull(sessionInfo.getRemoteAddress());
+        assertNotNull(sessionInfo.getSessionId());
+        assertEquals(isSecure(), sessionInfo.isSecure());
+
+        final byte[] receivedMessage = invocation.getArgumentAt(1, 
byte[].class);
+        final byte[] expectedBinary = expectedMessage.getBytes();
+        final int offset = invocation.getArgumentAt(2, Integer.class);
+        final int length = invocation.getArgumentAt(3, Integer.class);
+        assertNotNull(receivedMessage);
+        assertEquals(expectedBinary.length, receivedMessage.length);
+        assertEquals(expectedMessage, new String(receivedMessage));
+        assertEquals(0, offset);
+        assertEquals(expectedBinary.length, length);
+        latch.countDown();
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
new file mode 100644
index 0000000..249af7a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/ITJettyWebSocketSecureCommunication.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.websocket.WebSocketService;
+import org.junit.Test;
+
+
+public class ITJettyWebSocketSecureCommunication extends 
ITJettyWebSocketCommunication{
+
+    private final StandardSSLContextService sslContextService = new 
StandardSSLContextService();
+    private final ControllerServiceTestContext sslTestContext = new 
ControllerServiceTestContext(sslContextService, "SSLContextService");
+
+    public ITJettyWebSocketSecureCommunication() {
+        try {
+            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, 
"src/test/resources/certs/localhost-ks.jks");
+            
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, 
"localtest");
+            
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, 
"src/test/resources/certs/localhost-ks.jks");
+            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, 
"localtest");
+            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+
+            
sslContextService.initialize(sslTestContext.getInitializationContext());
+            
sslContextService.onConfigured(sslTestContext.getConfigurationContext());
+        } catch (InitializationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected boolean isSecure() {
+        return true;
+    }
+
+    @Override
+    protected void customizeServer() {
+        
serverServiceContext.getInitializationContext().addControllerService(sslContextService);
+        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, 
sslContextService.getIdentifier());
+    }
+
+    @Override
+    protected void customizeClient() {
+        
clientServiceContext.getInitializationContext().addControllerService(sslContextService);
+        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, 
sslContextService.getIdentifier());
+    }
+
+    @Test
+    public void testClientServerCommunication() throws Exception {
+        super.testClientServerCommunication();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
deleted file mode 100644
index a225447..0000000
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketCommunication.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket.jetty;
-
-import org.apache.nifi.processor.Processor;
-import org.apache.nifi.websocket.BinaryMessageConsumer;
-import org.apache.nifi.websocket.ConnectedListener;
-import org.apache.nifi.websocket.TextMessageConsumer;
-import org.apache.nifi.websocket.WebSocketClientService;
-import org.apache.nifi.websocket.WebSocketServerService;
-import org.apache.nifi.websocket.WebSocketSessionInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-
-import java.net.ServerSocket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-
-public class TestJettyWebSocketCommunication {
-
-    protected int serverPort;
-    protected String serverPath = "/test";
-    protected WebSocketServerService serverService;
-    protected ControllerServiceTestContext serverServiceContext;
-    protected WebSocketClientService clientService;
-    protected ControllerServiceTestContext clientServiceContext;
-
-    protected boolean isSecure() {
-        return false;
-    }
-
-    @Before
-    public void setup() throws Exception {
-        setupServer();
-
-        setupClient();
-    }
-
-    private void setupServer() throws Exception {
-        // Find an open port.
-        try (final ServerSocket serverSocket = new ServerSocket(0)) {
-            serverPort = serverSocket.getLocalPort();
-        }
-        serverService = new JettyWebSocketServer();
-        serverServiceContext = new ControllerServiceTestContext(serverService, 
"JettyWebSocketServer1");
-        serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, 
String.valueOf(serverPort));
-
-        customizeServer();
-
-        
serverService.initialize(serverServiceContext.getInitializationContext());
-        
serverService.startServer(serverServiceContext.getConfigurationContext());
-    }
-
-    protected void customizeServer() {
-    }
-
-    private void setupClient() throws Exception {
-        clientService = new JettyWebSocketClient();
-        clientServiceContext = new ControllerServiceTestContext(clientService, 
"JettyWebSocketClient1");
-        clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, 
(isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
-
-        customizeClient();
-
-        
clientService.initialize(clientServiceContext.getInitializationContext());
-        
clientService.startClient(clientServiceContext.getConfigurationContext());
-    }
-
-    protected void customizeClient() {
-    }
-
-    @After
-    public void teardown() throws Exception {
-        clientService.stopClient();
-        serverService.stopServer();
-    }
-
-    protected interface MockWebSocketProcessor extends Processor, 
ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
-    }
-
-    private boolean isWindowsEnvironment() {
-        return 
System.getProperty("os.name").toLowerCase().startsWith("windows");
-    }
-
-    @Test
-    public void testClientServerCommunication() throws Exception {
-        assumeFalse(isWindowsEnvironment());
-        // Expectations.
-        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
-        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
-        final CountDownLatch serverReceivedTextMessageFromClient = new 
CountDownLatch(1);
-        final CountDownLatch serverReceivedBinaryMessageFromClient = new 
CountDownLatch(1);
-        final CountDownLatch clientReceivedTextMessageFromServer = new 
CountDownLatch(1);
-        final CountDownLatch clientReceivedBinaryMessageFromServer = new 
CountDownLatch(1);
-
-        final String textMessageFromClient = "Message from client.";
-        final String textMessageFromServer = "Message from server.";
-
-        final MockWebSocketProcessor serverProcessor = 
mock(MockWebSocketProcessor.class);
-        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
-        final AtomicReference<String> serverSessionIdRef = new 
AtomicReference<>();
-
-        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, 
serverSessionIdRef, invocation))
-                
.when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> 
assertConsumeTextMessage(serverReceivedTextMessageFromClient, 
textMessageFromClient, invocation))
-                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> 
assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, 
textMessageFromClient, invocation))
-                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
-
-        serverService.registerProcessor(serverPath, serverProcessor);
-
-        final String clientId = "client1";
-
-        final MockWebSocketProcessor clientProcessor = 
mock(MockWebSocketProcessor.class);
-        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
-        final AtomicReference<String> clientSessionIdRef = new 
AtomicReference<>();
-
-
-        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, 
clientSessionIdRef, invocation))
-                
.when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> 
assertConsumeTextMessage(clientReceivedTextMessageFromServer, 
textMessageFromServer, invocation))
-                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> 
assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, 
textMessageFromServer, invocation))
-                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
-
-        clientService.registerProcessor(clientId, clientProcessor);
-
-        clientService.connect(clientId);
-
-        assertTrue("WebSocket client should be able to fire connected event.", 
clientConnectedServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to fire connected event.", 
serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromClient));
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-
-        assertTrue("WebSocket server should be able to consume text message.", 
serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to consume binary 
message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromServer));
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
-        assertTrue("WebSocket client should be able to consume text message.", 
clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket client should be able to consume binary 
message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
-        clientService.deregisterProcessor(clientId, clientProcessor);
-        serverService.deregisterProcessor(serverPath, serverProcessor);
-    }
-
-    @Test
-    public void testClientServerCommunicationRecovery() throws Exception {
-        assumeFalse(isWindowsEnvironment());
-        // Expectations.
-        final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
-        final CountDownLatch clientConnectedServer = new CountDownLatch(1);
-        final CountDownLatch serverReceivedTextMessageFromClient = new 
CountDownLatch(1);
-        final CountDownLatch serverReceivedBinaryMessageFromClient = new 
CountDownLatch(1);
-        final CountDownLatch clientReceivedTextMessageFromServer = new 
CountDownLatch(1);
-        final CountDownLatch clientReceivedBinaryMessageFromServer = new 
CountDownLatch(1);
-
-        final String textMessageFromClient = "Message from client.";
-        final String textMessageFromServer = "Message from server.";
-
-        final MockWebSocketProcessor serverProcessor = 
mock(MockWebSocketProcessor.class);
-        doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
-        final AtomicReference<String> serverSessionIdRef = new 
AtomicReference<>();
-
-        doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, 
serverSessionIdRef, invocation))
-                
.when(serverProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> 
assertConsumeTextMessage(serverReceivedTextMessageFromClient, 
textMessageFromClient, invocation))
-                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> 
assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, 
textMessageFromClient, invocation))
-                
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
-
-        serverService.registerProcessor(serverPath, serverProcessor);
-
-        final String clientId = "client1";
-
-        final MockWebSocketProcessor clientProcessor = 
mock(MockWebSocketProcessor.class);
-        doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
-        final AtomicReference<String> clientSessionIdRef = new 
AtomicReference<>();
-
-
-        doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, 
clientSessionIdRef, invocation))
-                
.when(clientProcessor).connected(any(WebSocketSessionInfo.class));
-
-        doAnswer(invocation -> 
assertConsumeTextMessage(clientReceivedTextMessageFromServer, 
textMessageFromServer, invocation))
-                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
-
-        doAnswer(invocation -> 
assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, 
textMessageFromServer, invocation))
-                
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), 
any(byte[].class), anyInt(), anyInt());
-
-        clientService.registerProcessor(clientId, clientProcessor);
-
-        clientService.connect(clientId);
-
-        assertTrue("WebSocket client should be able to fire connected event.", 
clientConnectedServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to fire connected event.", 
serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
-
-        // Nothing happens if maintenance is executed while sessions are alive.
-        ((JettyWebSocketClient) clientService).maintainSessions();
-
-        // Restart server.
-        serverService.stopServer();
-        
serverService.startServer(serverServiceContext.getConfigurationContext());
-
-        // Sessions will be recreated with the same session ids.
-        ((JettyWebSocketClient) clientService).maintainSessions();
-
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromClient));
-        clientService.sendMessage(clientId, clientSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
-
-        assertTrue("WebSocket server should be able to consume text message.", 
serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket server should be able to consume binary 
message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
-
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendString(textMessageFromServer));
-        serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender 
-> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
-
-        assertTrue("WebSocket client should be able to consume text message.", 
clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
-        assertTrue("WebSocket client should be able to consume binary 
message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
-
-        clientService.deregisterProcessor(clientId, clientProcessor);
-        serverService.deregisterProcessor(serverPath, serverProcessor);
-    }
-
-    protected Object assertConnectedEvent(CountDownLatch latch, 
AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-        sessionIdRef.set(sessionInfo.getSessionId());
-        latch.countDown();
-        return null;
-    }
-
-    protected Object assertConsumeTextMessage(CountDownLatch latch, String 
expectedMessage, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-
-        final String receivedMessage = invocation.getArgumentAt(1, 
String.class);
-        assertNotNull(receivedMessage);
-        assertEquals(expectedMessage, receivedMessage);
-        latch.countDown();
-        return null;
-    }
-
-    protected Object assertConsumeBinaryMessage(CountDownLatch latch, String 
expectedMessage, InvocationOnMock invocation) {
-        final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, 
WebSocketSessionInfo.class);
-        assertNotNull(sessionInfo.getLocalAddress());
-        assertNotNull(sessionInfo.getRemoteAddress());
-        assertNotNull(sessionInfo.getSessionId());
-        assertEquals(isSecure(), sessionInfo.isSecure());
-
-        final byte[] receivedMessage = invocation.getArgumentAt(1, 
byte[].class);
-        final byte[] expectedBinary = expectedMessage.getBytes();
-        final int offset = invocation.getArgumentAt(2, Integer.class);
-        final int length = invocation.getArgumentAt(3, Integer.class);
-        assertNotNull(receivedMessage);
-        assertEquals(expectedBinary.length, receivedMessage.length);
-        assertEquals(expectedMessage, new String(receivedMessage));
-        assertEquals(0, offset);
-        assertEquals(expectedBinary.length, length);
-        latch.countDown();
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a774f1df/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
 
b/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
deleted file mode 100644
index f5c96c2..0000000
--- 
a/nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/test/java/org/apache/nifi/websocket/jetty/TestJettyWebSocketSecureCommunication.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.websocket.jetty;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.ssl.StandardSSLContextService;
-import org.apache.nifi.websocket.WebSocketService;
-import org.junit.Test;
-
-
-public class TestJettyWebSocketSecureCommunication extends 
TestJettyWebSocketCommunication{
-
-    private final StandardSSLContextService sslContextService = new 
StandardSSLContextService();
-    private final ControllerServiceTestContext sslTestContext = new 
ControllerServiceTestContext(sslContextService, "SSLContextService");
-
-    public TestJettyWebSocketSecureCommunication() {
-        try {
-            sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, 
"src/test/resources/certs/localhost-ks.jks");
-            
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, 
"localtest");
-            
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
-            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, 
"src/test/resources/certs/localhost-ks.jks");
-            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, 
"localtest");
-            
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
-
-            
sslContextService.initialize(sslTestContext.getInitializationContext());
-            
sslContextService.onConfigured(sslTestContext.getConfigurationContext());
-        } catch (InitializationException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected boolean isSecure() {
-        return true;
-    }
-
-    @Override
-    protected void customizeServer() {
-        
serverServiceContext.getInitializationContext().addControllerService(sslContextService);
-        serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, 
sslContextService.getIdentifier());
-    }
-
-    @Override
-    protected void customizeClient() {
-        
clientServiceContext.getInitializationContext().addControllerService(sslContextService);
-        clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, 
sslContextService.getIdentifier());
-    }
-
-    @Test
-    public void testClientServerCommunication() throws Exception {
-        super.testClientServerCommunication();
-    }
-
-}

Reply via email to