This is an automated email from the ASF dual-hosted git repository.

more pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 96587a4  KNOX-1997: Adding changes to buffer messages from backend in 
onMessag… (#143)
96587a4 is described below

commit 96587a4ed89ce6691198a0df8a96a9c81ddb875f
Author: Rajat Goel <[email protected]>
AuthorDate: Wed Nov 13 19:55:32 2019 +0530

    KNOX-1997: Adding changes to buffer messages from backend in onMessag… 
(#143)
    
    * KNOX-1997: Adding changes to buffer messages from backend in 
onMessageText() if frontend session related data structures have not been setup 
i.e. remote is null. Message buffer will be flushed when remote is set by other 
thread executing onWebSocketConnect() API. To synchronise reading/flushing 
buffer, added a lock
---
 .../gateway/config/impl/GatewayConfigImpl.java     |  7 ++
 .../websockets/GatewayWebsocketHandler.java        |  3 +-
 .../gateway/websockets/ProxyWebSocketAdapter.java  | 72 ++++++++++++++--
 .../knox/gateway/websockets/BadBackendTest.java    |  5 +-
 .../gateway/websockets/ConnectionDroppedTest.java  |  5 +-
 .../gateway/websockets/MessageFailureTest.java     |  5 +-
 .../WebsocketEchoHTTPServiceRoleTest.java          |  1 +
 .../knox/gateway/websockets/WebsocketEchoTest.java |  3 +-
 .../gateway/websockets/WebsocketEchoTestBase.java  | 18 ++--
 .../WebsocketMultipleConnectionTest.java           |  7 +-
 ...va => WebsocketServerInitiatedMessageTest.java} | 98 ++++++++++++++--------
 .../apache/knox/gateway/config/GatewayConfig.java  |  7 ++
 .../org/apache/knox/gateway/GatewayTestConfig.java |  6 ++
 13 files changed, 181 insertions(+), 56 deletions(-)

diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index 71dd591..342628b 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -141,6 +141,7 @@ public class GatewayConfigImpl extends Configuration 
implements GatewayConfig {
   public static final String WEBSOCKET_INPUT_BUFFER_SIZE = 
GATEWAY_CONFIG_FILE_PREFIX + ".websocket.input.buffer.size";
   public static final String WEBSOCKET_ASYNC_WRITE_TIMEOUT = 
GATEWAY_CONFIG_FILE_PREFIX + ".websocket.async.write.timeout";
   public static final String WEBSOCKET_IDLE_TIMEOUT = 
GATEWAY_CONFIG_FILE_PREFIX + ".websocket.idle.timeout";
+  public static final String WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 
GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.wait.buffer.count";
 
   /**
    * Properties for for gateway port mapping feature
@@ -190,6 +191,7 @@ public class GatewayConfigImpl extends Configuration 
implements GatewayConfig {
   public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
   public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
   public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+  public static final int DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 100;
 
   public static final boolean DEFAULT_GATEWAY_PORT_MAPPING_ENABLED = true;
   public static final boolean DEFAULT_REMOTE_ALIAS_SERVICE_ENABLED = true;
@@ -851,6 +853,11 @@ public class GatewayConfigImpl extends Configuration 
implements GatewayConfig {
   }
 
   @Override
+  public int getWebsocketMaxWaitBufferCount() {
+    return getInt( WEBSOCKET_MAX_WAIT_BUFFER_COUNT, 
DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT);
+  }
+
+  @Override
   public Map<String, Integer> getGatewayPortMappings() {
 
     final Map<String, Integer> result = new ConcurrentHashMap<>();
diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
index 0f19052..8fff20b 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
@@ -118,7 +118,8 @@ public class GatewayWebsocketHandler extends 
WebSocketHandler
       LOG.debugLog("Generated backend URL for websocket connection: " + 
backendURL);
 
       /* Upgrade happens here */
-      return new ProxyWebSocketAdapter(URI.create(backendURL), pool, 
getClientEndpointConfig(req));
+      return new ProxyWebSocketAdapter
+              (URI.create(backendURL), pool, getClientEndpointConfig(req), 
config);
     } catch (final Exception e) {
       LOG.failedCreatingWebSocket(e);
       throw e;
diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
index 4c345cb..6364a01 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
@@ -19,7 +19,11 @@ package org.apache.knox.gateway.websockets;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.websocket.ClientEndpointConfig;
 import javax.websocket.CloseReason;
@@ -28,6 +32,7 @@ import javax.websocket.DeploymentException;
 import javax.websocket.WebSocketContainer;
 
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.eclipse.jetty.io.RuntimeIOException;
 import org.eclipse.jetty.util.component.LifeCycle;
 import org.eclipse.jetty.websocket.api.BatchMode;
@@ -57,21 +62,30 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter 
{
 
   private ExecutorService pool;
 
+  /* Message buffer for holding data frames temporarily in memory till 
connection is setup.
+   Keeping the max size of the buffer as 100 messages for now. */
+  private List<String> messageBuffer = new ArrayList<String>();
+  private Lock remoteLock = new ReentrantLock();
+
+  private final GatewayConfig config;
+
   /**
    * Used to transmit headers from browser to backend server.
    * @since 0.14
    */
   private ClientEndpointConfig clientConfig;
 
-  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
-    this(backend, pool, null);
+  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, 
GatewayConfig config) {
+    this(backend, pool, null, config);
   }
 
-  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, 
final ClientEndpointConfig clientConfig) {
+  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, 
final ClientEndpointConfig clientConfig,
+                               GatewayConfig config) {
     super();
     this.backend = backend;
     this.pool = pool;
     this.clientConfig = clientConfig;
+    this.config = config;
   }
 
   @Override
@@ -104,9 +118,33 @@ public class ProxyWebSocketAdapter extends 
WebSocketAdapter {
       throw new RuntimeIOException(e);
     }
 
+    remoteLock.lock();
     super.onWebSocketConnect(frontEndSession);
     this.frontendSession = frontEndSession;
 
+    final RemoteEndpoint remote = frontEndSession.getRemote();
+    try {
+      if (!messageBuffer.isEmpty()) {
+        LOG.debugLog("Found old buffered messages");
+        for (String obj:messageBuffer) {
+          LOG.debugLog("Sending old buffered message [From Backend <---]: " + 
obj);
+          remote.sendString(obj);
+        }
+        messageBuffer.clear();
+        if (remote.getBatchMode() == BatchMode.ON) {
+          remote.flush();
+        }
+      } else {
+        LOG.debugLog("Message buffer is empty");
+      }
+    } catch (IOException e) {
+      LOG.connectionFailed(e);
+      throw new RuntimeIOException(e);
+    }
+    finally
+    {
+      remoteLock.unlock();
+    }
   }
 
   @Override
@@ -198,12 +236,29 @@ public class ProxyWebSocketAdapter extends 
WebSocketAdapter {
 
       @Override
       public void onMessageText(String message, Object session) {
+        LOG.logMessage("[From Backend <---]" + message);
+        remoteLock.lock();
         final RemoteEndpoint remote = getRemote();
+        try {
+          if (remote == null) {
+            LOG.debugLog("Remote endpoint is null");
+            if (messageBuffer.size() >= 
config.getWebsocketMaxWaitBufferCount()) {
+              throw new RuntimeIOException("Remote is null and message buffer 
is full. Cannot buffer anymore ");
+            }
+            LOG.debugLog("Buffering message: " + message);
+            messageBuffer.add(message);
+            return;
+          }
 
-        LOG.logMessage("[From Backend <---]" + message);
+          /* Proxy message to frontend */
+          LOG.debugLog("Found old buffered messages");
+          for (String obj:messageBuffer) {
+            LOG.debugLog("Sending old buffered message [From Backend <---]: " 
+ obj);
+            remote.sendString(obj);
+          }
+          messageBuffer.clear();
 
-        /* Proxy message to frontend */
-        try {
+          LOG.debugLog("Sending current message [From Backend <---]: " + 
message);
           remote.sendString(message);
           if (remote.getBatchMode() == BatchMode.ON) {
             remote.flush();
@@ -212,7 +267,10 @@ public class ProxyWebSocketAdapter extends 
WebSocketAdapter {
           LOG.connectionFailed(e);
           throw new RuntimeIOException(e);
         }
-
+        finally
+        {
+          remoteLock.unlock();
+        }
       }
 
       @Override
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
index 6637813..194f172 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.ContextHandler;
@@ -25,6 +26,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.easymock.EasyMock;
 
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
@@ -77,13 +79,14 @@ public class BadBackendTest {
   }
 
   private static void startProxy() throws Exception {
+    GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     proxy = new Server();
     proxyConnector = new ServerConnector(proxy);
     proxy.addConnector(proxyConnector);
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(new URI(BAD_BACKEND), 
Executors.newFixedThreadPool(10)));
+        new ProxyWebSocketAdapter(new URI(BAD_BACKEND), 
Executors.newFixedThreadPool(10), gatewayConfig));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
index 80e3509..b2cf243 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
@@ -17,12 +17,14 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.ContextHandler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.easymock.EasyMock;
 
 import javax.websocket.ContainerProvider;
 import javax.websocket.WebSocketContainer;
@@ -112,13 +114,14 @@ public class ConnectionDroppedTest {
   }
 
   private static void startProxy() throws Exception {
+    GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     proxy = new Server();
     proxyConnector = new ServerConnector(proxy);
     proxy.addConnector(proxyConnector);
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(serverUri, 
Executors.newFixedThreadPool(10)));
+        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10), 
gatewayConfig));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
index 64b7f9a..e855f82 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -26,6 +27,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.easymock.EasyMock;
 
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
@@ -152,13 +154,14 @@ public class MessageFailureTest {
   }
 
   private static void startProxy() throws Exception {
+    GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     proxy = new Server();
     proxyConnector = new ServerConnector(proxy);
     proxy.addConnector(proxyConnector);
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(serverUri, 
Executors.newFixedThreadPool(10)));
+        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10), 
gatewayConfig));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
index e66015a..717a454 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
@@ -59,6 +59,7 @@ public class WebsocketEchoHTTPServiceRoleTest extends 
WebsocketEchoTestBase {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    handler = null;
     WebsocketEchoTestBase.setUpBeforeClass();
     WebsocketEchoTestBase.startServers("http");
   }
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
index 7a08b9e..1d3dbb8 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
@@ -21,10 +21,10 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.net.URI;
 import javax.websocket.ContainerProvider;
 import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
-import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -58,6 +58,7 @@ public class WebsocketEchoTest extends WebsocketEchoTestBase {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    handler = null;
     WebsocketEchoTestBase.setUpBeforeClass();
     WebsocketEchoTestBase.startServers("ws");
   }
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
index a62d330..86a580a 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
@@ -36,6 +36,7 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.ContextHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,9 +61,7 @@ import static 
org.apache.knox.gateway.config.GatewayConfig.DEFAULT_IDENTITY_KEYS
 import static 
org.apache.knox.gateway.config.GatewayConfig.DEFAULT_IDENTITY_KEY_PASSPHRASE_ALIAS;
 
 /**
- * Base class for tests that attempt to proxy websocket connections through 
Knox
- * gateway. It setups a websocket socket connection that simply echoes data 
back.
- *
+ * Base class for websocoket echo tests.
  */
 public class WebsocketEchoTestBase {
   private static final String TEST_KEY_ALIAS = "test-identity";
@@ -70,7 +69,7 @@ public class WebsocketEchoTestBase {
   /**
    * Simulate backend websocket
    */
-  private static Server backendServer;
+  public static Server backendServer;
   /**
    * URI for backend websocket server
    */
@@ -93,6 +92,8 @@ public class WebsocketEchoTestBase {
    */
   public static URI serverUri;
 
+  public static WebSocketHandler handler;
+
   private static File topoDir;
   private static Path dataDir;
   private static Path securityDir;
@@ -142,7 +143,11 @@ public class WebsocketEchoTestBase {
     ServerConnector connector = new ServerConnector(backendServer);
     backendServer.addConnector(connector);
 
-    final WebsocketEchoHandler handler = new WebsocketEchoHandler();
+    synchronized (WebsocketEchoTestBase.class) {
+      if (handler == null) {
+        handler = new WebsocketEchoHandler();
+      }
+    }
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
@@ -272,6 +277,9 @@ public class WebsocketEchoTestBase {
     EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
         
.andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
 
+    EasyMock.expect(gatewayConfig.getWebsocketMaxWaitBufferCount())
+        
.andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT).anyTimes();
+
     EasyMock.expect(gatewayConfig.getRemoteRegistryConfigurationNames())
             .andReturn(Collections.emptyList())
             .anyTimes();
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
index c69d5b9..a2a4ae7 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
@@ -113,7 +113,7 @@ public class WebsocketMultipleConnectionTest {
   /**
    * Maximum number of open connections to test.
    */
-  private static int MAX_CONNECTIONS = 100;
+  private static int MAX_CONNECTIONS = 99;
 
   public WebsocketMultipleConnectionTest() {
     super();
@@ -172,7 +172,7 @@ public class WebsocketMultipleConnectionTest {
       }
     }
 
-    latch.await(5 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
+    latch.await(50 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
 
     /* 90 KB per connection */
     /*
@@ -314,6 +314,9 @@ public class WebsocketMultipleConnectionTest {
     EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
         
.andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
 
+    EasyMock.expect(gatewayConfig.getWebsocketMaxWaitBufferCount())
+        
.andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT).anyTimes();
+
     EasyMock.expect(gatewayConfig.getRemoteRegistryConfigurationNames())
             .andReturn(Collections.emptyList())
             .anyTimes();
diff --git 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
similarity index 51%
copy from 
gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
copy to 
gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
index 7a08b9e..26722df 100644
--- 
a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
+++ 
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
@@ -17,13 +17,23 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.websocket.ContainerProvider;
-import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
+import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
@@ -50,14 +60,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
  *
  * @since 0.10
  */
-public class WebsocketEchoTest extends WebsocketEchoTestBase {
+public class WebsocketServerInitiatedMessageTest extends WebsocketEchoTestBase 
{
 
-  public WebsocketEchoTest() {
+  public WebsocketServerInitiatedMessageTest() {
     super();
   }
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    handler = new WebsocketServerInitiatedEchoHandler();
     WebsocketEchoTestBase.setUpBeforeClass();
     WebsocketEchoTestBase.startServers("ws");
   }
@@ -68,51 +79,64 @@ public class WebsocketEchoTest extends 
WebsocketEchoTestBase {
   }
 
   /*
-   * Test direct connection to websocket server without gateway
+   * Test websocket server initiated echo
    */
   @Test
-  public void testDirectEcho() throws Exception {
-
+  public void testGatewayServerInitiatedEcho() throws Exception {
     WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
     WebsocketClient client = new WebsocketClient();
+    container.connectToServer(client,
+            new URI(serverUri.toString() + 
"gateway/websocket/123foo456bar/channels"));
 
-    Session session = container.connectToServer(client, backendServerUri);
+    //session.getBasicRemote().sendText("Echo");
+    client.messageQueue.awaitMessages(1, 5000, TimeUnit.MILLISECONDS);
 
-    session.getBasicRemote().sendText("Echo");
-    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+    assertThat(client.messageQueue.get(0), is("echo"));
   }
 
-  /*
-   * Test websocket proxying through gateway.
+  /**
+   * A Mock websocket handler
+   *
    */
-  @Test
-  public void testGatewayEcho() throws Exception {
-    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-
-    WebsocketClient client = new WebsocketClient();
-    Session session = container.connectToServer(client,
-        new URI(serverUri.toString() + "gateway/websocket/ws"));
-
-    session.getBasicRemote().sendText("Echo");
-    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
-
-    assertThat(client.messageQueue.get(0), is("Echo"));
+  private static class WebsocketServerInitiatedEchoHandler extends 
WebSocketHandler implements WebSocketCreator {
+    private final ServerInitiatingMessageSocket socket = new 
ServerInitiatingMessageSocket();
+
+    @Override
+    public void configure(WebSocketServletFactory factory) {
+      factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+      factory.setCreator(this);
+    }
+
+    @Override
+    public Object createWebSocket(ServletUpgradeRequest req, 
ServletUpgradeResponse resp) {
+      return socket;
+    }
   }
 
-  /*
-   * Test websocket rewrite rules proxying through gateway.
+  /**
+   * A simple socket initiating message on connect
    */
-  @Test
-  public void testGatewayRewriteEcho() throws Exception {
-    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-
-    WebsocketClient client = new WebsocketClient();
-    Session session = container.connectToServer(client,
-            new URI(serverUri.toString() + 
"gateway/websocket/123foo456bar/channels"));
-
-    session.getBasicRemote().sendText("Echo");
-    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
-
-    assertThat(client.messageQueue.get(0), is("Echo"));
+  private static class ServerInitiatingMessageSocket extends WebSocketAdapter {
+
+    @Override
+    public void onWebSocketError(Throwable cause) {
+      throw new RuntimeException(cause);
+    }
+
+    @Override
+    public void onWebSocketConnect(Session sess) {
+      super.onWebSocketConnect(sess);
+
+      try {
+        RemoteEndpoint remote = getRemote();
+        remote.sendString("echo", null);
+        if (remote.getBatchMode() == BatchMode.ON) {
+          remote.flush();
+        }
+      } catch (IOException x) {
+        throw new RuntimeIOException(x);
+      }
+    }
   }
 }
diff --git 
a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java 
b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index 790e6b2..3e6b32a 100644
--- 
a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ 
b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -379,6 +379,13 @@ public interface GatewayConfig {
    */
   int getWebsocketIdleTimeout();
 
+  /**
+   * Max count of messages that can be temporarily buffered in memory before a 
connection is properly setup.
+   * @since 0.10
+   * @return buffer size
+   */
+  int getWebsocketMaxWaitBufferCount();
+
   boolean isMetricsEnabled();
 
   boolean isJmxMetricsReportingEnabled();
diff --git 
a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
 
b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index 2f3d588..4a2c754 100644
--- 
a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ 
b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -47,6 +47,7 @@ public class GatewayTestConfig extends Configuration 
implements GatewayConfig {
   public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
   public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
   public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+  public static final int DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 100;
 
   private Path gatewayHomePath = Paths.get("gateway-home");
   private String hadoopConfDir = "hadoop";
@@ -545,6 +546,11 @@ public class GatewayTestConfig extends Configuration 
implements GatewayConfig {
   }
 
   @Override
+  public int getWebsocketMaxWaitBufferCount() {
+    return DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT;
+  }
+
+  @Override
   public boolean isMetricsEnabled() {
     return false;
   }

Reply via email to