Author: markt Date: Mon Feb 11 08:59:28 2013 New Revision: 1444684 URL: http://svn.apache.org/r1444684 Log: OutputStream and Writer support
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1444684&r1=1444683&r2=1444684&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Mon Feb 11 08:59:28 2013 @@ -218,8 +218,8 @@ public abstract class WsRemoteEndpointBa sm.getString("wsRemoteEndpoint.concurrentMessageSend")); } try { - TextMessageSendHandler tmsh = new TextMessageSendHandler( - completion, text, true, encoder, encoderBuffer, this); + TextMessageSendHandler tmsh = new TextMessageSendHandler(completion, + CharBuffer.wrap(text), true, encoder, encoderBuffer, this); tmsh.write(); } finally { writeLock.unlock(); @@ -230,25 +230,27 @@ public abstract class WsRemoteEndpointBa @Override public void sendPartialString(String fragment, boolean isLast) throws IOException { - boolean locked = writeLock.tryLock(); - if (!locked) { - throw new IllegalStateException( - sm.getString("wsRemoteEndpoint.concurrentMessageSend")); - } - try { - FutureToSendHandler f2sh = new FutureToSendHandler(); - TextMessageSendHandler tmsh = new TextMessageSendHandler( - f2sh, fragment, isLast, encoder, encoderBuffer, this); - tmsh.write(); - f2sh.get(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } finally { - writeLock.unlock(); - } + sendPartialString(CharBuffer.wrap(fragment), isLast); } + @Override + public OutputStream getSendStream() throws IOException { + return new WsOuputStream(this); + } + + + @Override + public Writer getSendWriter() throws IOException { + return new WsWriter(this); + } + + + + + + + /** * Sends a control message, blocking until the message is sent. @@ -279,6 +281,27 @@ public abstract class WsRemoteEndpointBa } + void sendPartialString(CharBuffer fragment, boolean isLast) + throws IOException { + boolean locked = writeLock.tryLock(); + if (!locked) { + throw new IllegalStateException( + sm.getString("wsRemoteEndpoint.concurrentMessageSend")); + } + try { + FutureToSendHandler f2sh = new FutureToSendHandler(); + TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, + fragment, isLast, encoder, encoderBuffer, this); + tmsh.write(); + f2sh.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } finally { + writeLock.unlock(); + } + } + + private void sendMessage(byte opCode, ByteBuffer payload, boolean last, SendHandler completion) { @@ -337,18 +360,6 @@ public abstract class WsRemoteEndpointBa @Override - public OutputStream getSendStream() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public Writer getSendWriter() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override public void sendObject(Object o) throws IOException, EncodeException { // TODO Auto-generated method stub @@ -513,11 +524,11 @@ public abstract class WsRemoteEndpointBa private final WsRemoteEndpointBase endpoint; private volatile boolean isDone = false; - public TextMessageSendHandler(SendHandler handler, String message, + public TextMessageSendHandler(SendHandler handler, CharBuffer message, boolean isLast, CharsetEncoder encoder, ByteBuffer encoderBuffer, WsRemoteEndpointBase endpoint) { this.handler = handler; - this.message = CharBuffer.wrap(message); + this.message = message; this.isLast = isLast; this.encoder = encoder.reset(); this.buffer = encoderBuffer; @@ -707,4 +718,92 @@ public abstract class WsRemoteEndpointBa return result; } } + + + private static class WsOuputStream extends OutputStream { + + private final WsRemoteEndpointBase endpoint; + private final ByteBuffer buffer = ByteBuffer.allocate(8192); + + public WsOuputStream(WsRemoteEndpointBase endpoint) { + this.endpoint = endpoint; + } + + @Override + public void write(int b) throws IOException { + if (buffer.remaining() == 0) { + flush(); + } + buffer.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int remaining = buffer.remaining(); + int written = 0; + + while (remaining < len) { + buffer.put(b, off + written, remaining); + written += remaining; + flush(); + remaining = buffer.remaining(); + } + buffer.put(b, off + written, len - written); + } + + @Override + public void flush() throws IOException { + doWrite(false); + } + + @Override + public void close() throws IOException { + doWrite(true); + } + + private void doWrite(boolean last) throws IOException { + endpoint.sendPartialBytes(buffer, last); + buffer.clear(); + } + } + + + private static class WsWriter extends Writer { + + private final WsRemoteEndpointBase endpoint; + private final CharBuffer buffer = CharBuffer.allocate(8192); + + public WsWriter(WsRemoteEndpointBase endpoint) { + this.endpoint = endpoint; + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + int remaining = buffer.remaining(); + int written = 0; + + while (remaining < len) { + buffer.put(cbuf, off + written, remaining); + written += remaining; + flush(); + remaining = buffer.remaining(); + } + buffer.put(cbuf, off + written, len - written); + } + + @Override + public void flush() throws IOException { + doWrite(false); + } + + @Override + public void close() throws IOException { + doWrite(true); + } + + private void doWrite(boolean last) throws IOException { + endpoint.sendPartialString(buffer, last); + buffer.clear(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org