Repository: cxf Updated Branches: refs/heads/master 2c2c74460 -> 627a47eb6
add status+headers in websocket response; enable all the tests for CXF-5339 Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/627a47eb Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/627a47eb Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/627a47eb Branch: refs/heads/master Commit: 627a47eb65a2e8cb3e9ebb6894f29d8fcc407daf Parents: 2c2c744 Author: Akitoshi Yoshida <a...@apache.org> Authored: Fri Feb 28 10:52:46 2014 +0100 Committer: Akitoshi Yoshida <a...@apache.org> Committed: Fri Feb 28 10:53:07 2014 +0100 ---------------------------------------------------------------------- .../transport/http_jetty/JettyWebSocket.java | 127 ++++++++++++++----- .../jaxrs/websocket/BookStoreWebSocket.java | 11 +- .../JAXRSClientServerWebSocketTest.java | 109 ++++++++++++++-- .../jaxrs/websocket/WebSocketTestClient.java | 23 +++- 4 files changed, 222 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/627a47eb/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyWebSocket.java ---------------------------------------------------------------------- diff --git a/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyWebSocket.java b/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyWebSocket.java index d20d816..30660aa99 100644 --- a/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyWebSocket.java +++ b/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyWebSocket.java @@ -60,7 +60,8 @@ import org.eclipse.jetty.websocket.WebSocket; class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage { private static final Logger LOG = LogUtils.getL7dLogger(JettyWebSocket.class); - + private static final String CRLF = "\r\n"; + private JettyHTTPDestination jettyHTTPDestination; private ServletContext servletContext; private Connection webSocketConnection; @@ -142,9 +143,8 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa } } - @SuppressWarnings("unchecked") - <T> T getRequestProperty(String name, Class<T> cls) { - return (T)requestProperties.get(name); + private <T> T getRequestProperty(String name, Class<T> cls) { + return getValue(requestProperties, name, cls); } private WebSocketVirtualServletRequest createServletRequest(byte[] data, int offset, int length) @@ -163,14 +163,34 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa * @param offset * @param length */ - public void write(byte[] data, int offset, int length) throws IOException { + void write(byte[] data, int offset, int length) throws IOException { LOG.log(Level.INFO, "write(byte[], offset, length)"); webSocketConnection.sendMessage(data, offset, length); } - public ServletOutputStream getServletOutputStream() { + private static byte[] buildResponse(Map<String, String> headers, byte[] data, int offset, int length) { + StringBuilder sb = new StringBuilder(); + String v = headers.get("$sc"); + sb.append(v == null ? "200" : v).append(CRLF); + v = headers.get("Content-Type"); + if (v != null) { + sb.append("Content-Type: ").append(v).append(CRLF); + } + sb.append(CRLF); + + byte[] hb = sb.toString().getBytes(); + byte[] longdata = new byte[hb.length + length]; + System.arraycopy(hb, 0, longdata, 0, hb.length); + if (data != null && length > 0) { + System.arraycopy(data, offset, longdata, hb.length, length); + } + return longdata; + } + + ServletOutputStream getServletOutputStream(final Map<String, String> headers) { LOG.log(Level.INFO, "getServletOutputStream()"); return new ServletOutputStream() { + @Override public void write(int b) throws IOException { byte[] data = new byte[1]; @@ -180,14 +200,21 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public void write(byte[] data, int offset, int length) throws IOException { + if (headers.get("$flushed") == null) { + data = buildResponse(headers, data, offset, length); + offset = 0; + length = data.length; + headers.put("$flushed", "true"); + } webSocketConnection.sendMessage(data, offset, length); } }; } - public OutputStream getOutputStream() { + OutputStream getOutputStream(final Map<String, String> headers) { LOG.log(Level.INFO, "getServletOutputStream()"); return new OutputStream() { + @Override public void write(int b) throws IOException { byte[] data = new byte[1]; @@ -197,6 +224,12 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public void write(byte[] data, int offset, int length) throws IOException { + if (headers.get("$flushed") == null) { + data = buildResponse(headers, data, offset, length); + offset = 0; + length = data.length; + headers.put("$flushed", "true"); + } webSocketConnection.sendMessage(data, offset, length); } }; @@ -219,9 +252,9 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa requestHeaders = readHeaders(in); String path = requestHeaders.get("$path"); String origin = websocket.getRequestProperty("requestURI", String.class); - if (path.length() < origin.length()) { - //TODO use a more appropriate exception (invalidxxx?); - throw new IOException("invalid path: " + path + " not within " + origin); + if (path.startsWith(origin)) { + //REVISIT for now, log it here and reject the request later. + LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin}); } } @@ -518,16 +551,16 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public Enumeration<String> getHeaders(String name) { - // TODO Auto-generated method stub LOG.log(Level.INFO, "getHeaders"); + // our protocol assumes no multiple headers return Collections.enumeration(Arrays.asList(requestHeaders.get(name))); } @Override public int getIntHeader(String name) { - // TODO Auto-generated method stub LOG.log(Level.INFO, "getIntHeader"); - return 0; + String v = requestHeaders.get(name); + return v == null ? -1 : Integer.parseInt(v); } @Override @@ -587,9 +620,16 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public StringBuffer getRequestURL() { LOG.log(Level.INFO, "getRequestURL"); + String url = websocket.getRequestProperty("requestURL", String.class); String origin = websocket.getRequestProperty("requestURI", String.class); StringBuffer sb = new StringBuffer(); - sb.append(origin).append(getRequestURI().substring(origin.length())); + String uri = getRequestURI(); + //REVISIT the way to reject the requeist uri that does not match the original request + if (!uri.startsWith(origin)) { + sb.append(url).append("invalid").append(uri); + } else { + sb.append(url).append(uri.substring(origin.length())); + } return sb; } @@ -677,20 +717,31 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa LOG.log(Level.INFO, "logout"); } - } //TODO need to make the header setting to be written to the body (as symmetric to the request behavior) static class WebSocketVirtualServletResponse implements HttpServletResponse { private JettyWebSocket websocket; - + private Map<String, String> responseHeaders; + private boolean flushed; + public WebSocketVirtualServletResponse(JettyWebSocket websocket) { this.websocket = websocket; + this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER); } @Override public void flushBuffer() throws IOException { LOG.log(Level.INFO, "flushBuffer()"); + if (!flushed) { + //REVISIT this mechanism to determine if the headers have been flushed + if (responseHeaders.get("$flushed") == null) { + byte[] data = buildResponse(responseHeaders, null, 0, 0); + websocket.write(data, 0, data.length); + responseHeaders.put("$flushed", "true"); + } + flushed = true; + } } @Override @@ -708,9 +759,8 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public String getContentType() { - // TODO Auto-generated method stub LOG.log(Level.INFO, "getContentType()"); - return null; + return responseHeaders.get("Content-Type"); } @Override @@ -723,13 +773,13 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public ServletOutputStream getOutputStream() throws IOException { LOG.log(Level.INFO, "getOutputStream()"); - return websocket.getServletOutputStream(); + return websocket.getServletOutputStream(responseHeaders); } @Override public PrintWriter getWriter() throws IOException { LOG.log(Level.INFO, "getWriter()"); - return new PrintWriter(websocket.getOutputStream()); + return new PrintWriter(websocket.getOutputStream(responseHeaders)); } @Override @@ -763,18 +813,18 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public void setContentLength(int len) { - // TODO if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "setContentLength({0})", len); } + responseHeaders.put("Content-Length", Integer.toString(len)); } @Override public void setContentType(String type) { - // TODO Auto-generated method stub if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "setContentType({0})", type); } + responseHeaders.put("Content-Type", type); } @Override @@ -801,18 +851,18 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public void addHeader(String name, String value) { - // TODO if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "addHeader({0}, {1})", new Object[]{name, value}); } + responseHeaders.put(name, value); } @Override public void addIntHeader(String name, int value) { - // TODO if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "addIntHeader({0}, {1})", new Object[]{name, value}); } + responseHeaders.put(name, Integer.toString(value)); } @Override @@ -887,25 +937,26 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public int getStatus() { - // TODO Auto-generated method stub LOG.log(Level.INFO, "getStatus()"); - return 0; + String v = responseHeaders.get("$sc"); + return v == null ? 200 : Integer.parseInt(v); } @Override public void sendError(int sc) throws IOException { - // TODO Auto-generated method stub if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "sendError{0}", sc); } + responseHeaders.put("$sc", Integer.toString(sc)); } @Override public void sendError(int sc, String msg) throws IOException { - // TODO Auto-generated method stub if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "sendError({0}, {1})", new Object[]{sc, msg}); } + responseHeaders.put("$sc", Integer.toString(sc)); + responseHeaders.put("$sm", msg); } @Override @@ -942,17 +993,19 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa @Override public void setStatus(int sc) { - // TODO Auto-generated method stub if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "setStatus({0})", sc); } + responseHeaders.put("$sc", Integer.toString(sc)); } @Override public void setStatus(int sc, String sm) { - // TODO Auto-generated method stub - LOG.log(Level.INFO, "setStatus({0}, {1})", new Object[]{sc, sm}); - + if (LOG.isLoggable(Level.INFO)) { + LOG.log(Level.INFO, "setStatus({0}, {1})", new Object[]{sc, sm}); + } + responseHeaders.put("$sc", Integer.toString(sc)); + responseHeaders.put("$sm", sm); } } @@ -987,9 +1040,15 @@ class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessa return headers; } - ///// this is copied from AttachmentDeserializer. we may think about putting this method to IOUtils + @SuppressWarnings("unchecked") + private static <T> T getValue(Map<String, Object> properties, String name, Class<T> cls) { + return (T)properties.get(name); + } + + + ///// this is copied from AttachmentDeserializer with a minor change. think about putting this method to IOUtils private static String readLine(InputStream in) throws IOException { - StringBuffer buffer = new StringBuffer(128); + StringBuilder buffer = new StringBuilder(128); int c; http://git-wip-us.apache.org/repos/asf/cxf/blob/627a47eb/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java index 40ffee3..e658ca6 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java @@ -22,6 +22,8 @@ package org.apache.cxf.systest.jaxrs.websocket; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; @@ -38,6 +40,8 @@ import org.apache.cxf.systest.jaxrs.Book; @Path("/web/bookstore") public class BookStoreWebSocket { + private static ExecutorService executor = Executors.newSingleThreadExecutor(); + @GET @Path("/booknames") @@ -50,7 +54,8 @@ public class BookStoreWebSocket { @Path("/booknames/servletstream") @Produces("text/plain") public void getBookNameStream(@Context HttpServletResponse response) throws Exception { - OutputStream os = response.getOutputStream(); + OutputStream os = response.getOutputStream(); + response.setContentType("text/plain"); os.write("CXF in Action".getBytes()); os.flush(); } @@ -78,7 +83,7 @@ public class BookStoreWebSocket { public void write(final OutputStream out) throws IOException, WebApplicationException { out.write(("Today: " + new java.util.Date()).getBytes()); // just for testing, using a thread - new Thread(new Runnable() { + executor.execute(new Runnable() { public void run() { try { for (int r = 2, i = 1; i <= 5; r *= 2, i++) { @@ -89,7 +94,7 @@ public class BookStoreWebSocket { e.printStackTrace(); } } - }).start(); + }); } }; } http://git-wip-us.apache.org/repos/asf/cxf/blob/627a47eb/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java index be9058d..583f3a4 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java @@ -27,7 +27,6 @@ import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestBase { @@ -55,7 +54,10 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB assertTrue("one book must be returned", wsclient.await(3)); List<byte[]> received = wsclient.getReceivedBytes(); assertEquals(1, received.size()); - String value = new String(received.get(0)); + Response resp = new Response(received.get(0)); + assertEquals(200, resp.getStatusCode()); + assertEquals("text/plain", resp.getContentType()); + String value = new String(resp.getEntity()); assertEquals("CXF in Action", value); // call another GET service @@ -63,7 +65,10 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB wsclient.sendMessage("GET /web/bookstore/books/123".getBytes()); assertTrue("response expected", wsclient.await(3)); received = wsclient.getReceivedBytes(); - value = new String(received.get(0)); + resp = new Response(received.get(0)); + assertEquals(200, resp.getStatusCode()); + assertEquals("application/xml", resp.getContentType()); + value = new String(resp.getEntity()); assertTrue(value.startsWith("<?xml ") && value.endsWith("</Book>")); // call the POST service @@ -71,17 +76,25 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB wsclient.sendMessage("POST /web/bookstore/booksplain\r\nContent-Type: text/plain\r\n\r\n123".getBytes()); assertTrue("response expected", wsclient.await(3)); received = wsclient.getReceivedBytes(); - value = new String(received.get(0)); + resp = new Response(received.get(0)); + assertEquals(200, resp.getStatusCode()); + assertEquals("text/plain", resp.getContentType()); + value = new String(resp.getEntity()); assertEquals("123", value); // call the GET service returning a continous stream output wsclient.reset(6); wsclient.sendMessage("GET /web/bookstore/bookbought".getBytes()); - assertTrue("wrong method, no response expected", wsclient.await(5)); + assertTrue("response expected", wsclient.await(5)); received = wsclient.getReceivedBytes(); assertEquals(6, received.size()); - assertTrue((new String(received.get(0))).startsWith("Today:")); + resp = new Response(received.get(0)); + assertEquals(200, resp.getStatusCode()); + assertEquals("application/octet-stream", resp.getContentType()); + value = new String(resp.getEntity()); + assertTrue(value.startsWith("Today:")); for (int r = 2, i = 1; i < 6; r *= 2, i++) { + // subsequent data should not carry headers assertEquals(r, Integer.parseInt(new String(received.get(i)))); } } finally { @@ -100,7 +113,10 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB assertTrue("one book must be returned", wsclient.await(3)); List<byte[]> received = wsclient.getReceivedBytes(); assertEquals(1, received.size()); - String value = new String(received.get(0)); + Response resp = new Response(received.get(0)); + assertEquals(200, resp.getStatusCode()); + assertEquals("text/plain", resp.getContentType()); + String value = new String(resp.getEntity()); assertEquals("CXF in Action", value); } finally { wsclient.close(); @@ -108,7 +124,6 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB } @Test - @Ignore public void testWrongMethod() throws Exception { String address = "ws://localhost:" + PORT + "/web/bookstore"; @@ -117,13 +132,17 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB try { // call the GET service using POST wsclient.sendMessage("POST /web/bookstore/booknames".getBytes()); + assertTrue("error response expected", wsclient.await(3)); + List<byte[]> received = wsclient.getReceivedBytes(); + assertEquals(1, received.size()); + Response resp = new Response(received.get(0)); + assertEquals(405, resp.getStatusCode()); } finally { wsclient.close(); } } @Test - @Ignore public void testPathRestriction() throws Exception { String address = "ws://localhost:" + PORT + "/web/bookstore"; @@ -132,9 +151,81 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB try { // call the GET service over the different path wsclient.sendMessage("GET /bookstore2".getBytes()); + assertTrue("error response expected", wsclient.await(3)); + List<byte[]> received = wsclient.getReceivedBytes(); + assertEquals(1, received.size()); + Response resp = new Response(received.get(0)); + assertEquals(404, resp.getStatusCode()); } finally { wsclient.close(); } } + + //TODO this is a temporary way to verify the response; we should come up with something better. + private static class Response { + private byte[] data; + private int pos; + private int statusCode; + private String contentType; + private byte[] entity; + + public Response(byte[] data) { + this.data = data; + String line = readLine(); + statusCode = Integer.parseInt(line); + while ((line = readLine()) != null) { + if (line.length() > 0) { + int del = line.indexOf(':'); + String h = line.substring(0, del).trim(); + String v = line.substring(del + 1).trim(); + if ("Content-Type".equalsIgnoreCase(h)) { + contentType = v; + } + } + } + entity = new byte[data.length - pos]; + System.arraycopy(data, pos, entity, 0, entity.length); + } + + + + public int getStatusCode() { + return statusCode; + } + public String getContentType() { + return contentType; + } + + public byte[] getEntity() { + return entity; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("Status: ").append(statusCode).append("\r\n"); + sb.append("Type: ").append(contentType).append("\r\n"); + sb.append("Entity: ").append(new String(entity)).append("\r\n"); + return sb.toString(); + } + + private String readLine() { + StringBuilder sb = new StringBuilder(); + while (pos < data.length) { + int c = 0xff & data[pos++]; + if (c == '\n') { + break; + } else if (c == '\r') { + continue; + } else { + sb.append((char)c); + } + } + if (sb.length() == 0) { + return null; + } + return sb.toString(); + } + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/627a47eb/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java index 24af539..6dff1ac 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java @@ -147,9 +147,28 @@ class WebSocketTestClient { StringBuilder xbuf = new StringBuilder().append("\nHEX: "); StringBuilder cbuf = new StringBuilder().append("\nASC: "); for (byte b : data) { - xbuf.append(Integer.toHexString(0xff & b)).append(' '); - cbuf.append((0x80 & b) != 0 ? '.' : (char)b).append(" "); + writeHex(xbuf, 0xff & b); + writePrintable(cbuf, 0xff & b); } return xbuf.append(cbuf); } + + private static void writeHex(StringBuilder buf, int b) { + buf.append(Integer.toHexString(0x100 | (0xff & b)).substring(1)).append(' '); + } + + private static void writePrintable(StringBuilder buf, int b) { + if (b == 0x0d) { + buf.append("\\r"); + } else if (b == 0x0a) { + buf.append("\\n"); + } else if (b == 0x09) { + buf.append("\\t"); + } else if ((0x80 & b) != 0) { + buf.append('.').append(' '); + } else { + buf.append((char)b).append(' '); + } + buf.append(' '); + } }