Repository: cxf Updated Branches: refs/heads/3.0.x-fixes db43643b3 -> d84e03f7f
[CXF-5979] Allow some headers to be returned in WebSocket's streaming responses Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/d84e03f7 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/d84e03f7 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/d84e03f7 Branch: refs/heads/3.0.x-fixes Commit: d84e03f7f4d77f110b4dad910dcb2b737558861f Parents: db43643 Author: Akitoshi Yoshida <[email protected]> Authored: Fri Aug 29 15:41:53 2014 +0200 Committer: Akitoshi Yoshida <[email protected]> Committed: Fri Aug 29 16:34:08 2014 +0200 ---------------------------------------------------------------------- .../release/samples/jax_rs/websocket/README.txt | 14 +- .../demo/jaxrs/client/WebSocketTestClient.java | 34 +++-- .../java/demo/jaxrs/server/CustomerService.java | 19 ++- .../cxf/transport/websocket/WebSocketUtils.java | 93 +++++++++--- .../WebSocketVirtualServletResponse.java | 5 +- .../websocket/ahc/AhcWebSocketConduit.java | 35 +++-- .../transport/websocket/WebSocketUtilsTest.java | 82 +++++++++++ .../jaxrs/websocket/BookStoreWebSocket.java | 44 +++++- .../JAXRSClientServerWebSocketTest.java | 144 +++++++++++++++++++ .../jaxrs/websocket/WebSocketTestClient.java | 37 +++-- 10 files changed, 439 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/distribution/src/main/release/samples/jax_rs/websocket/README.txt ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/README.txt b/distribution/src/main/release/samples/jax_rs/websocket/README.txt index 19d8ecb..09c43e8 100644 --- a/distribution/src/main/release/samples/jax_rs/websocket/README.txt +++ b/distribution/src/main/release/samples/jax_rs/websocket/README.txt @@ -75,14 +75,24 @@ with the data: updates the customer instance whose id is 123 -A GET request to path /monitor +A GET request to path /monitor with id monitor-12345 ------------------------------------------------------------------------ GET /customerservice/monitor +requestId: monitor-12345 ------------------------------------------------------------------------ returns a continuous event stream on the customer -activities. +activities. Try invoking some customer related operations. + +A GET request to path /unmonitor with id monitor-12345 + +------------------------------------------------------------------------ +GET /customerservice/unmonitor/monitor-12345 +------------------------------------------------------------------------ + +unregisters the event stream and returns its status. + The client code demonstrates how to send GET/POST/PUT/DELETE requests over a websocket. http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java index 5db967c..ba5a7e8 100644 --- a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java +++ b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/client/WebSocketTestClient.java @@ -216,18 +216,21 @@ class WebSocketTestClient { public Response(Object data) { this.data = data; - String line = readLine(); - if (line != null) { - statusCode = Integer.parseInt(line); - while ((line = readLine()) != null) { - if (line.length() > 0) { - int del = line.indexOf(':'); - String h = line.substring(0, del).trim(); - String v = line.substring(del + 1).trim(); - if ("Content-Type".equalsIgnoreCase(h)) { - contentType = v; - } - } + String line; + boolean first = true; + while ((line = readLine()) != null) { + if (first && isStatusCode(line)) { + statusCode = Integer.parseInt(line); + continue; + } else { + first = false; + } + + int del = line.indexOf(':'); + String h = line.substring(0, del).trim(); + String v = line.substring(del + 1).trim(); + if ("Content-Type".equalsIgnoreCase(h)) { + contentType = v; } } if (data instanceof String) { @@ -237,8 +240,11 @@ class WebSocketTestClient { System.arraycopy((byte[])data, pos, (byte[])entity, 0, ((byte[])entity).length); } } - - + + private static boolean isStatusCode(String line) { + char c = line.charAt(0); + return '0' <= c && c <= '9'; + } public int getStatusCode() { return statusCode; http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java index 9ba3e05..2927d24 100644 --- a/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java +++ b/distribution/src/main/release/samples/jax_rs/websocket/src/main/java/demo/jaxrs/server/CustomerService.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import javax.ws.rs.DELETE; import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; @@ -39,6 +40,8 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; +import org.apache.cxf.transport.websocket.WebSocketConstants; + @Path("/customerservice/") @Produces("text/xml") public class CustomerService { @@ -47,7 +50,7 @@ public class CustomerService { long currentId = 123; Map<Long, Customer> customers = new HashMap<Long, Customer>(); Map<Long, Order> orders = new HashMap<Long, Order>(); - Set<OutputStream> monitors = new HashSet<OutputStream>(); + Map<String, OutputStream> monitors = new HashMap<String, OutputStream>(); public CustomerService() { init(); @@ -123,21 +126,29 @@ public class CustomerService { @GET @Path("/monitor") @Produces("text/*") - public StreamingOutput getBookBought() { + public StreamingOutput monitorCustomers(@HeaderParam(WebSocketConstants.DEFAULT_REQUEST_ID_KEY) String reqid) { + final String key = reqid == null ? "*" : reqid; return new StreamingOutput() { public void write(final OutputStream out) throws IOException, WebApplicationException { - monitors.add(out); + monitors.put(key, out); out.write(("Subscribed at " + new java.util.Date()).getBytes()); } }; } + @GET + @Path("/unmonitor/{key}") + @Produces("text/*") + public String unmonitorCustomers(@PathParam("key") String key) { + return (monitors.remove(key) != null ? "Removed: " : "Already removed: ") + key; + } + private void sendCustomerEvent(final String msg, final Customer customer) { executor.execute(new Runnable() { public void run() { try { String t = msg + ": " + customer.getId() + "/" + customer.getName(); - for (Iterator<OutputStream> it = monitors.iterator(); it.hasNext();) { + for (Iterator<OutputStream> it = monitors.values().iterator(); it.hasNext();) { OutputStream out = it.next(); try { out.write(t.getBytes()); http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java index 7bb27d2..a55639c 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java @@ -19,6 +19,7 @@ package org.apache.cxf.transport.websocket; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -34,8 +35,8 @@ public final class WebSocketUtils { static final String SC_KEY = "$sc"; static final String SM_KEY = "$sm"; static final String FLUSHED_KEY = "$flushed"; - private static final String CRLF = "\r\n"; - private static final String COLSP = ": "; + private static final byte[] CRLF = "\r\n".getBytes(); + private static final byte[] COLSP = ": ".getBytes(); private static final String DEFAULT_SC = "200"; private WebSocketUtils() { @@ -125,38 +126,62 @@ public final class WebSocketUtils { * @return */ public static byte[] buildResponse(Map<String, String> headers, byte[] data, int offset, int length) { - StringBuilder sb = new StringBuilder(); + ByteArrayBuilder sb = new ByteArrayBuilder(); String v = headers.get(SC_KEY); sb.append(v == null ? DEFAULT_SC : v).append(CRLF); appendHeaders(headers, sb); - sb.append(CRLF); - byte[] longdata = sb.toString().getBytes(); + byte[] longdata = sb.toByteArray(); if (data != null && length > 0) { - final byte[] hb = longdata; - longdata = new byte[hb.length + length]; - System.arraycopy(hb, 0, longdata, 0, hb.length); - System.arraycopy(data, offset, longdata, hb.length, length); + longdata = buildResponse(longdata, data, offset, length); + } + return longdata; + } + + /** + * Build response bytes with some generated headers. + * + * @param headers + * @param data + * @param offset + * @param length + * @return + */ + public static byte[] buildResponse(byte[] headers, byte[] data, int offset, int length) { + final int hlen = headers != null ? headers.length : 0; + byte[] longdata = new byte[length + 2 + hlen]; + + if (hlen > 0) { + System.arraycopy(headers, 0, longdata, 0, hlen); } + longdata[hlen] = 0x0d; + longdata[hlen + 1] = 0x0a; + System.arraycopy(data, offset, longdata, hlen + 2, length); return longdata; } /** * Build response bytes without status and type information. * + * @param headers * @param data * @param offset * @param length * @return */ public static byte[] buildResponse(byte[] data, int offset, int length) { - byte[] longdata = new byte[length + 2]; - longdata[0] = 0x0d; - longdata[1] = 0x0a; - System.arraycopy(data, offset, longdata, 2, length); - return longdata; + return buildResponse((byte[])null, data, offset, length); } + static byte[] buildHeaderLine(String name, String value) { + byte[] hl = new byte[name.length() + COLSP.length + value.length() + CRLF.length]; + System.arraycopy(name.getBytes(), 0, hl, 0, name.length()); + System.arraycopy(COLSP, 0, hl, name.length(), COLSP.length); + System.arraycopy(value.getBytes(), 0, hl, name.length() + COLSP.length, value.length()); + System.arraycopy(CRLF, 0, hl, name.length() + COLSP.length + value.length(), CRLF.length); + return hl; + } + /** * Build request bytes with the specified method, url, headers, and content entity. * @@ -170,12 +195,12 @@ public final class WebSocketUtils { */ public static byte[] buildRequest(String method, String url, Map<String, String> headers, byte[] data, int offset, int length) { - StringBuilder sb = new StringBuilder(); + ByteArrayBuilder sb = new ByteArrayBuilder(); sb.append(method).append(' ').append(url).append(CRLF); appendHeaders(headers, sb); sb.append(CRLF); - byte[] longdata = sb.toString().getBytes(); + byte[] longdata = sb.toByteArray(); if (data != null && length > 0) { final byte[] hb = longdata; longdata = new byte[hb.length + length]; @@ -185,11 +210,45 @@ public final class WebSocketUtils { return longdata; } - private static void appendHeaders(Map<String, String> headers, StringBuilder sb) { + private static void appendHeaders(Map<String, String> headers, ByteArrayBuilder sb) { for (Entry<String, String> header : headers.entrySet()) { if (!header.getKey().startsWith("$")) { sb.append(header.getKey()).append(COLSP).append(header.getValue()).append(CRLF); } } } + + private static class ByteArrayBuilder { + private ByteArrayOutputStream baos; + public ByteArrayBuilder() { + baos = new ByteArrayOutputStream(); + } + + public ByteArrayBuilder append(byte[] b) { + try { + baos.write(b); + } catch (IOException e) { + // ignore; + } + return this; + } + + public ByteArrayBuilder append(String s) { + try { + baos.write(s.getBytes()); + } catch (IOException e) { + // ignore + } + return this; + } + + public ByteArrayBuilder append(char c) { + baos.write(c); + return this; + } + + public byte[] toByteArray() { + return baos.toByteArray(); + } + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java index 5bd6e7c..11aa712 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java @@ -338,7 +338,10 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse { buffer.write(data, offset, length); } else { // unbuffered write to the socket - data = WebSocketUtils.buildResponse(data, offset, length); + String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY); + byte[] headers = respid != null + ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null; + data = WebSocketUtils.buildResponse(headers, data, offset, length); webSocketHolder.write(data, 0, data.length); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java index 612d986..3280cae 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/ahc/AhcWebSocketConduit.java @@ -375,20 +375,22 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit { public Response(String idKey, Object data) { this.data = data; - String line = readLine(); - if (line != null) { - statusCode = Integer.parseInt(line); - while ((line = readLine()) != null) { - if (line.length() > 0) { - int del = line.indexOf(':'); - String h = line.substring(0, del).trim(); - String v = line.substring(del + 1).trim(); - if ("Content-Type".equalsIgnoreCase(h)) { - contentType = v; - } else if (idKey.equals(h)) { - id = v; - } - } + String line; + boolean first = true; + while ((line = readLine()) != null) { + if (first && isStatusCode(line)) { + statusCode = Integer.parseInt(line); + continue; + } else { + first = false; + } + int del = line.indexOf(':'); + String h = line.substring(0, del).trim(); + String v = line.substring(del + 1).trim(); + if ("Content-Type".equalsIgnoreCase(h)) { + contentType = v; + } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h)) { + id = v; } } if (data instanceof String) { @@ -399,6 +401,11 @@ public class AhcWebSocketConduit extends URLConnectionHTTPConduit { } } + private static boolean isStatusCode(String line) { + char c = line.charAt(0); + return '0' <= c && c <= '9'; + } + public int getStatusCode() { return statusCode; } http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java new file mode 100644 index 0000000..257be24 --- /dev/null +++ b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/WebSocketUtilsTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.websocket; + +import java.util.Map; +import java.util.TreeMap; + +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class WebSocketUtilsTest extends Assert { + private static final byte[] TEST_BODY_BYTES = "buenos dias".getBytes(); + private static final byte[] TEST_HEADERS_BYTES = "200\r\nContent-Type: text/xml;charset=utf-8\r\n".getBytes(); + private static final byte[] TEST_ID_BYTES = + (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY + ": 31415926-5358-9793-2384-626433832795\r\n").getBytes(); + private static final Map<String, String> TEST_HEADERS_MAP; + private static final byte[] CRLF = "\r\n".getBytes(); + + + static { + TEST_HEADERS_MAP = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER); + TEST_HEADERS_MAP.put(WebSocketUtils.SC_KEY, "200"); + TEST_HEADERS_MAP.put("Content-Type", "text/xml;charset=utf-8"); + TEST_HEADERS_MAP.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, "31415926-5358-9793-2384-626433832795"); + } + @Test + public void testBuildResponse() { + byte[] r = WebSocketUtils.buildResponse(TEST_BODY_BYTES, 0, TEST_BODY_BYTES.length); + verifyBytes(CRLF, 0, r, 0, 2); + verifyBytes(TEST_BODY_BYTES, 0, r, 2, TEST_BODY_BYTES.length); + assertEquals(2 + TEST_BODY_BYTES.length, r.length); + + r = WebSocketUtils.buildResponse(TEST_HEADERS_BYTES, TEST_BODY_BYTES, 0, TEST_BODY_BYTES.length); + verifyBytes(TEST_HEADERS_BYTES, 0, r, 0, TEST_HEADERS_BYTES.length); + verifyBytes(CRLF, 0, r, TEST_HEADERS_BYTES.length, 2); + verifyBytes(TEST_BODY_BYTES, 0, r, TEST_HEADERS_BYTES.length + 2, TEST_BODY_BYTES.length); + assertEquals(TEST_HEADERS_BYTES.length + 2 + TEST_BODY_BYTES.length, r.length); + + r = WebSocketUtils.buildResponse(TEST_HEADERS_MAP, TEST_BODY_BYTES, 0, TEST_BODY_BYTES.length); + verifyBytes(TEST_HEADERS_BYTES, 0, r, 0, TEST_HEADERS_BYTES.length); + verifyBytes(TEST_ID_BYTES, 0, r, TEST_HEADERS_BYTES.length, TEST_ID_BYTES.length); + verifyBytes(CRLF, 0, r, TEST_HEADERS_BYTES.length + TEST_ID_BYTES.length, 2); + verifyBytes(TEST_BODY_BYTES, 0, r, + TEST_HEADERS_BYTES.length + TEST_ID_BYTES.length + 2, TEST_BODY_BYTES.length); + assertEquals(TEST_HEADERS_BYTES.length + TEST_ID_BYTES.length + 2 + TEST_BODY_BYTES.length, r.length); + + // with some offset + r = WebSocketUtils.buildResponse(TEST_BODY_BYTES, 3, 3); + verifyBytes(CRLF, 0, r, 0, 2); + verifyBytes(TEST_BODY_BYTES, 3, r, 2, 3); + assertEquals(2 + 3, r.length); + } + + private void verifyBytes(byte[] expected, int epos, byte[] result, int rpos, int length) { + for (int i = 0; i < length; i++) { + if (result[rpos + i] != expected[epos + i]) { + fail("Wrong byte at position result[" + (rpos + i) + "]. Expected " + + expected[epos + i] + " but was " + result[rpos + i]); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java index 857bccc..eed72a3 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java @@ -23,12 +23,16 @@ package org.apache.cxf.systest.jaxrs.websocket; import java.io.IOException; import java.io.OutputStream; import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -39,11 +43,12 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.cxf.jaxrs.ext.StreamingResponse; import org.apache.cxf.systest.jaxrs.Book; +import org.apache.cxf.transport.websocket.WebSocketConstants; @Path("/web/bookstore") public class BookStoreWebSocket { private static ExecutorService executor = Executors.newSingleThreadExecutor(); - + private Map<String, OutputStream> eventsStreams = new HashMap<String, OutputStream>(); @GET @Path("/booknames") @@ -136,6 +141,43 @@ public class BookStoreWebSocket { } return "Held from " + from + " for " + t + " ms"; } + + @GET + @Path("/events/register") + @Produces("text/plain") + public StreamingOutput registerEventsStream(@HeaderParam(WebSocketConstants.DEFAULT_REQUEST_ID_KEY) String reqid) { + final String key = reqid == null ? "*" : reqid; + return new StreamingOutput() { + public void write(final OutputStream out) throws IOException, WebApplicationException { + eventsStreams.put(key, out); + out.write(("Registered " + key + " at " + new java.util.Date()).getBytes()); + } + }; + + } + + @GET + @Path("/events/create/{name}") + @Produces("text/plain") + public String createEvent(@PathParam("name") String name) { + for (Iterator<OutputStream> it = eventsStreams.values().iterator(); it.hasNext();) { + OutputStream out = it.next(); + try { + out.write(("News: event " + name + " created").getBytes()); + } catch (IOException e) { + it.remove(); + e.printStackTrace(); + } + } + return name + " created"; + } + + @GET + @Path("/events/unregister/{key}") + @Produces("text/plain") + public String unregisterEventsStream(@PathParam("key") String key) { + return (eventsStreams.remove(key) != null ? "Unregistered: " : "Already Unregistered: ") + key; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java index 01e4ccb..006bcbc 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java @@ -159,6 +159,39 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB } } + @Test + public void testGetBookStreamWithIDReferences() throws Exception { + String address = "ws://localhost:" + getPort() + getContext() + "/websocket/web/bookstore"; + + WebSocketTestClient wsclient = new WebSocketTestClient(address); + wsclient.connect(); + try { + wsclient.reset(5); + String reqid = UUID.randomUUID().toString(); + wsclient.sendMessage( + ("GET " + getContext() + "/websocket/web/bookstore/bookstream\r\nAccept: application/json\r\n" + + WebSocketConstants.DEFAULT_REQUEST_ID_KEY + ": " + reqid + "\r\n\r\n") + .getBytes()); + assertTrue("response expected", wsclient.await(5)); + List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses(); + assertEquals(5, received.size()); + WebSocketTestClient.Response resp = received.get(0); + assertEquals(200, resp.getStatusCode()); + assertEquals("application/json", resp.getContentType()); + String value = resp.getTextEntity(); + assertEquals(value, getBookJson(1)); + for (int i = 2; i <= 5; i++) { + // subsequent data should not carry the status but the id header + resp = received.get(i - 1); + assertEquals(0, resp.getStatusCode()); + assertEquals(reqid, resp.getId()); + assertEquals(resp.getTextEntity(), getBookJson(i)); + } + } finally { + wsclient.close(); + } + } + private String getBookJson(int index) { return "{\"Book\":{\"id\":" + index + ",\"name\":\"WebSocket" + index + "\"}}"; } @@ -330,6 +363,117 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB } } + @Test + public void testStreamRegisterAndUnregister() throws Exception { + String address = "ws://localhost:" + getPort() + getContext() + "/websocket/web/bookstore"; + + WebSocketTestClient wsclient1 = new WebSocketTestClient(address); + WebSocketTestClient wsclient2 = new WebSocketTestClient(address); + wsclient1.connect(); + wsclient2.connect(); + try { + String regkey = UUID.randomUUID().toString(); + + EventCreatorRunner runner = new EventCreatorRunner(wsclient2, regkey, 1000, 1000); + new Thread(runner).start(); + + // register for the event stream with requestId ane expect to get 2 messages + wsclient1.reset(3); + wsclient1.sendTextMessage( + "GET " + getContext() + "/websocket/web/bookstore/events/register\r\n" + + WebSocketConstants.DEFAULT_REQUEST_ID_KEY + ": " + regkey + "\r\n\r\n"); + assertFalse("only 2 responses expected", wsclient1.await(5)); + List<WebSocketTestClient.Response> received = wsclient1.getReceivedResponses(); + assertEquals(2, received.size()); + + // the first response is the registration confirmation + WebSocketTestClient.Response resp = received.get(0); + assertEquals(200, resp.getStatusCode()); + assertEquals("text/plain", resp.getContentType()); + String value = resp.getTextEntity(); + assertTrue(value.startsWith("Registered " + regkey)); + String id = resp.getId(); + assertEquals("unexpected responseId", regkey, id); + + // the second response is the event news + resp = received.get(1); + assertEquals(0, resp.getStatusCode()); + value = resp.getTextEntity(); + assertEquals("News: event Hello created", value); + id = resp.getId(); + assertEquals("unexpected responseId", regkey, id); + + String[] values = runner.getValues(); + assertTrue(runner.isCompleted()); + assertEquals("Hello created", values[0]); + assertTrue(values[1].startsWith("Unregistered: " + regkey)); + assertEquals("Hola created", values[2]); + } finally { + wsclient1.close(); + wsclient2.close(); + } + } + + private class EventCreatorRunner implements Runnable { + private WebSocketTestClient wsclient; + private String key; + private long delay1; + private long delay2; + private String[] values = new String[3]; + private boolean completed; + + public EventCreatorRunner(WebSocketTestClient wsclient, String key, long delay1, long delay2) { + this.wsclient = wsclient; + this.key = key; + this.delay1 = delay1; + this.delay2 = delay2; + } + + public void run() { + try { + Thread.sleep(delay1); + // creating an event and the event stream will see this event + wsclient.sendTextMessage( + "GET " + getContext() + "/websocket/web/bookstore/events/create/Hello\r\n\r\n"); + assertTrue("response expected", wsclient.await(3)); + List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses(); + WebSocketTestClient.Response resp = received.get(0); + values[0] = resp.getTextEntity(); + + Thread.sleep(delay2); + wsclient.reset(1); + // unregistering the event stream + wsclient.sendTextMessage( + "GET " + getContext() + "/websocket/web/bookstore/events/unregister/" + key + "\r\n\r\n"); + assertTrue("response expected", wsclient.await(3)); + received = wsclient.getReceivedResponses(); + resp = received.get(0); + values[1] = resp.getTextEntity(); + + wsclient.reset(1); + // creating another event and the event stream will not see this event + wsclient.sendTextMessage( + "GET " + getContext() + "/websocket/web/bookstore/events/create/Hola\r\n\r\n"); + assertTrue("response expected", wsclient.await(3)); + received = wsclient.getReceivedResponses(); + resp = received.get(0); + values[2] = resp.getTextEntity(); + } catch (InterruptedException e) { + // ignore + } finally { + completed = true; + } + } + + public String[] getValues() { + return values; + } + + public boolean isCompleted() { + return completed; + } + } + protected String getPort() { return PORT; } http://git-wip-us.apache.org/repos/asf/cxf/blob/d84e03f7/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java index 7e45544..09e61c2 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java @@ -221,20 +221,23 @@ class WebSocketTestClient { public Response(Object data) { this.data = data; - String line = readLine(); - if (line != null) { - statusCode = Integer.parseInt(line); - while ((line = readLine()) != null) { - if (line.length() > 0) { - int del = line.indexOf(':'); - String h = line.substring(0, del).trim(); - String v = line.substring(del + 1).trim(); - if ("Content-Type".equalsIgnoreCase(h)) { - contentType = v; - } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h)) { - id = v; - } - } + String line; + boolean first = true; + while ((line = readLine()) != null) { + if (first && isStatusCode(line)) { + statusCode = Integer.parseInt(line); + continue; + } else { + first = false; + } + + int del = line.indexOf(':'); + String h = line.substring(0, del).trim(); + String v = line.substring(del + 1).trim(); + if ("Content-Type".equalsIgnoreCase(h)) { + contentType = v; + } else if (WebSocketConstants.DEFAULT_RESPONSE_ID_KEY.equals(h)) { + id = v; } } if (data instanceof String) { @@ -245,7 +248,11 @@ class WebSocketTestClient { } } - + private static boolean isStatusCode(String line) { + char c = line.charAt(0); + return '0' <= c && c <= '9'; + } + public int getStatusCode() { return statusCode; }
