KNOX-895 - Pass Headers and Cookies to websocket backend
Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/2d236e78 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/2d236e78 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/2d236e78 Branch: refs/heads/KNOX-1049 Commit: 2d236e78b70ef7fb312ebf0fa198657595e2f4ba Parents: 7b401de Author: Sandeep More <[email protected]> Authored: Wed Oct 11 17:04:52 2017 -0400 Committer: Sandeep More <[email protected]> Committed: Wed Oct 11 17:04:52 2017 -0400 ---------------------------------------------------------------------- .../websockets/GatewayWebsocketHandler.java | 41 +- .../gateway/websockets/ProxyInboundClient.java | 107 ++++++ .../websockets/ProxyWebSocketAdapter.java | 20 +- .../websockets/ProxyInboundClientTest.java | 374 +++++++++++++++++++ 4 files changed, 530 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java index 75a4a2b..0ee54fd 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java @@ -21,6 +21,8 @@ import java.io.File; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,11 +42,13 @@ import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.eclipse.jetty.websocket.servlet.WebSocketCreator; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import javax.websocket.ClientEndpointConfig; + /** * Websocket handler that will handle websocket connection request. This class * is responsible for creating a proxy socket for inbound and outbound * connections. This is also where the http to websocket handoff happens. - * + * * @since 0.10 */ public class GatewayWebsocketHandler extends WebSocketHandler @@ -74,7 +78,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler /** * Create an instance - * + * * @param config * @param services */ @@ -90,7 +94,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler /* * (non-Javadoc) - * + * * @see * org.eclipse.jetty.websocket.server.WebSocketHandler#configure(org.eclipse. * jetty.websocket.servlet.WebSocketServletFactory) @@ -119,7 +123,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler /* * (non-Javadoc) - * + * * @see * org.eclipse.jetty.websocket.servlet.WebSocketCreator#createWebSocket(org. * eclipse.jetty.websocket.servlet.ServletUpgradeRequest, @@ -137,7 +141,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler final String backendURL = getMatchedBackendURL(path); /* Upgrade happens here */ - return new ProxyWebSocketAdapter(URI.create(backendURL), pool); + return new ProxyWebSocketAdapter(URI.create(backendURL), pool, getClientEndpointConfig(req)); } catch (final Exception e) { LOG.failedCreatingWebSocket(e); throw e; @@ -145,11 +149,32 @@ public class GatewayWebsocketHandler extends WebSocketHandler } /** + * Returns a {@link ClientEndpointConfig} config that contains the headers + * to be passed to the backend. + * @since 0.14.0 + * @param req + * @return + */ + private ClientEndpointConfig getClientEndpointConfig(final ServletUpgradeRequest req) { + + return ClientEndpointConfig.Builder.create().configurator( new ClientEndpointConfig.Configurator() { + + @Override + public void beforeRequest(final Map<String, List<String>> headers) { + + /* Add request headers */ + req.getHeaders().forEach(headers::putIfAbsent); + + } + }).build(); + } + + /** * This method looks at the context path and returns the backend websocket * url. If websocket url is found it is used as is, or we default to * ws://{host}:{port} which might or might not be right. - * - * @param The context path + * + * @param * @return Websocket backend url */ private synchronized String getMatchedBackendURL(final String path) { @@ -203,7 +228,7 @@ public class GatewayWebsocketHandler extends WebSocketHandler URI serviceUri = new URI(backendURL); backend.append(serviceUri); /* Avoid Zeppelin Regression - as this would require ambari changes and break current knox websocket use case*/ - if (!StringUtils.endsWith(backend.toString(), "/ws") && pathService[1] != null) { + if (!StringUtils.endsWith(backend.toString(), "/ws") && pathService.length > 0 && pathService[1] != null) { backend.append(pathService[1]); } } http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java new file mode 100644 index 0000000..4e938d2 --- /dev/null +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundClient.java @@ -0,0 +1,107 @@ +package org.apache.hadoop.gateway.websockets; + +import javax.websocket.CloseReason; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; +import javax.websocket.Session; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +/** + * A Websocket client with callback which is not annotation based. + * This handler accepts String and binary messages. + * @since 0.14.0 + */ +public class ProxyInboundClient extends Endpoint { + + /** + * Callback to be called once we have events on our socket. + */ + private MessageEventCallback callback; + + protected Session session; + protected EndpointConfig config; + + + public ProxyInboundClient(final MessageEventCallback callback) { + super(); + this.callback = callback; + } + + /** + * Developers must implement this method to be notified when a new + * conversation has just begun. + * + * @param backendSession the session that has just been activated. + * @param config the configuration used to configure this endpoint. + */ + @Override + public void onOpen(final javax.websocket.Session backendSession, final EndpointConfig config) { + this.session = backendSession; + this.config = config; + + /* Set the max message size */ + session.setMaxBinaryMessageBufferSize(Integer.MAX_VALUE); + session.setMaxTextMessageBufferSize(Integer.MAX_VALUE); + + /* Add message handler for binary data */ + session.addMessageHandler(new MessageHandler.Whole<byte[]>() { + + /** + * Called when the message has been fully received. + * + * @param message the message data. + */ + @Override + public void onMessage(final byte[] message) { + callback.onMessageBinary(message, true, session); + } + + }); + + /* Add message handler for text data */ + session.addMessageHandler(new MessageHandler.Whole<String>() { + + /** + * Called when the message has been fully received. + * + * @param message the message data. + */ + @Override + public void onMessage(final String message) { + callback.onMessageText(message, session); + } + + }); + + callback.onConnectionOpen(backendSession); + } + + @Override + public void onClose(final javax.websocket.Session backendSession, final CloseReason closeReason) { + callback.onConnectionClose(closeReason); + this.session = null; + } + + @Override + public void onError(final javax.websocket.Session backendSession, final Throwable cause) { + callback.onError(cause); + this.session = null; + } + +} http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java index 1e7f583..4ea8d6c 100644 --- a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java +++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java @@ -20,8 +20,8 @@ package org.apache.hadoop.gateway.websockets; import java.io.IOException; import java.net.URI; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import javax.websocket.ClientEndpointConfig; import javax.websocket.CloseReason; import javax.websocket.ContainerProvider; import javax.websocket.DeploymentException; @@ -60,12 +60,23 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { private ExecutorService pool; /** + * Used to transmit headers from browser to backend server. + * @since 0.14 + */ + private ClientEndpointConfig clientConfig; + + /** * Create an instance */ public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) { + this(backend, pool, null); + } + + public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig clientConfig) { super(); this.backend = backend; this.pool = pool; + this.clientConfig = clientConfig; } @Override @@ -76,14 +87,15 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter { * plumbing takes place */ container = ContainerProvider.getWebSocketContainer(); - final ProxyInboundSocket backendSocket = new ProxyInboundSocket( - getMessageCallback()); + + final ProxyInboundClient backendSocket = new ProxyInboundClient(getMessageCallback()); /* build the configuration */ /* Attempt Connect */ try { - backendSession = container.connectToServer(backendSocket, backend); + backendSession = container.connectToServer(backendSocket, clientConfig, backend); + LOG.onConnectionOpen(backend.toString()); } catch (DeploymentException e) { http://git-wip-us.apache.org/repos/asf/knox/blob/2d236e78/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java ---------------------------------------------------------------------- diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java new file mode 100644 index 0000000..69b45dd --- /dev/null +++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ProxyInboundClientTest.java @@ -0,0 +1,374 @@ +package org.apache.hadoop.gateway.websockets; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import org.apache.commons.lang.RandomStringUtils; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.websocket.CloseReason; +import javax.websocket.ContainerProvider; +import javax.websocket.DeploymentException; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.instanceOf; + +/** + * Test {@link ProxyInboundClient} class. + * @since 0.14.0 + */ +public class ProxyInboundClientTest { + + private static Server server; + private static URI serverUri; + private static Handler handler; + + String recievedMessage = null; + + byte[] recievedBinaryMessage = null; + + + /* create an instance */ + public ProxyInboundClientTest() { + super(); + } + + @BeforeClass + public static void startWSServer() throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + server.addConnector(connector); + + handler = new WebsocketEchoHandler(); + + ContextHandler context = new ContextHandler(); + context.setContextPath("/"); + context.setHandler(handler); + server.setHandler(context); + + server.start(); + + String host = connector.getHost(); + if (host == null) + { + host = "localhost"; + } + int port = connector.getLocalPort(); + serverUri = new URI(String.format("ws://%s:%d/",host,port)); + } + + @AfterClass + public static void stopServer() + { + try + { + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(System.err); + } + } + + //@Test(timeout = 3000) + @Test + public void testClientInstance() throws IOException, DeploymentException { + + final String textMessage = "Echo"; + final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes()); + + final AtomicBoolean isTestComplete = new AtomicBoolean(false); + + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { + + /** + * A generic callback, can be left un-implemented + * + * @param message + */ + @Override + public void doCallback(String message) { + + } + + /** + * Callback when connection is established. + * + * @param session + */ + @Override + public void onConnectionOpen(Object session) { + + } + + /** + * Callback when connection is closed. + * + * @param reason + */ + @Override + public void onConnectionClose(CloseReason reason) { + isTestComplete.set(true); + } + + /** + * Callback when there is an error in connection. + * + * @param cause + */ + @Override + public void onError(Throwable cause) { + isTestComplete.set(true); + } + + /** + * Callback when a text message is received. + * + * @param message + * @param session + */ + @Override + public void onMessageText(String message, Object session) { + recievedMessage = message; + isTestComplete.set(true); + } + + /** + * Callback when a binary message is received. + * + * @param message + * @param last + * @param session + */ + @Override + public void onMessageBinary(byte[] message, boolean last, + Object session) { + + } + } ); + + Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); + + Session session = container.connectToServer(client, serverUri); + + session.getBasicRemote().sendText(textMessage); + + while(!isTestComplete.get()) { + /* just wait for the test to finish */ + } + + Assert.assertEquals("The received text message is not the same as the sent", textMessage, recievedMessage); + } + + @Test(timeout = 3000) + public void testBinarymessage() throws IOException, DeploymentException { + + final String textMessage = "Echo"; + final ByteBuffer binarymessage = ByteBuffer.wrap(textMessage.getBytes()); + + final AtomicBoolean isTestComplete = new AtomicBoolean(false); + + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { + + /** + * A generic callback, can be left un-implemented + * + * @param message + */ + @Override + public void doCallback(String message) { + + } + + /** + * Callback when connection is established. + * + * @param session + */ + @Override + public void onConnectionOpen(Object session) { + + } + + /** + * Callback when connection is closed. + * + * @param reason + */ + @Override + public void onConnectionClose(CloseReason reason) { + isTestComplete.set(true); + } + + /** + * Callback when there is an error in connection. + * + * @param cause + */ + @Override + public void onError(Throwable cause) { + isTestComplete.set(true); + } + + /** + * Callback when a text message is received. + * + * @param message + * @param session + */ + @Override + public void onMessageText(String message, Object session) { + recievedMessage = message; + isTestComplete.set(true); + } + + /** + * Callback when a binary message is received. + * + * @param message + * @param last + * @param session + */ + @Override + public void onMessageBinary(byte[] message, boolean last, + Object session) { + recievedBinaryMessage = message; + isTestComplete.set(true); + } + } ); + + Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); + + Session session = container.connectToServer(client, serverUri); + + session.getBasicRemote().sendBinary(binarymessage); + + while(!isTestComplete.get()) { + /* just wait for the test to finish */ + } + + Assert.assertEquals("Binary message does not match", textMessage, new String(recievedBinaryMessage)); + } + + @Test(timeout = 3000) + public void testTextMaxBufferLimit() throws IOException, DeploymentException { + + final String longMessage = RandomStringUtils.random(100000); + + final AtomicBoolean isTestComplete = new AtomicBoolean(false); + + final WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + final ProxyInboundClient client = new ProxyInboundClient( new MessageEventCallback() { + + /** + * A generic callback, can be left un-implemented + * + * @param message + */ + @Override + public void doCallback(String message) { + + } + + /** + * Callback when connection is established. + * + * @param session + */ + @Override + public void onConnectionOpen(Object session) { + + } + + /** + * Callback when connection is closed. + * + * @param reason + */ + @Override + public void onConnectionClose(CloseReason reason) { + isTestComplete.set(true); + } + + /** + * Callback when there is an error in connection. + * + * @param cause + */ + @Override + public void onError(Throwable cause) { + isTestComplete.set(true); + } + + /** + * Callback when a text message is received. + * + * @param message + * @param session + */ + @Override + public void onMessageText(String message, Object session) { + recievedMessage = message; + isTestComplete.set(true); + } + + /** + * Callback when a binary message is received. + * + * @param message + * @param last + * @param session + */ + @Override + public void onMessageBinary(byte[] message, boolean last, + Object session) { + + } + } ); + + Assert.assertThat(client, instanceOf(javax.websocket.Endpoint.class)); + + Session session = container.connectToServer(client, serverUri); + + session.getBasicRemote().sendText(longMessage); + + while(!isTestComplete.get()) { + /* just wait for the test to finish */ + } + + Assert.assertEquals(longMessage, recievedMessage); + + } + + + +}
