Author: markt
Date: Mon Feb 11 08:59:28 2013
New Revision: 1444684
URL: http://svn.apache.org/r1444684
Log:
OutputStream and Writer support
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1444684&r1=1444683&r2=1444684&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Mon
Feb 11 08:59:28 2013
@@ -218,8 +218,8 @@ public abstract class WsRemoteEndpointBa
sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
}
try {
- TextMessageSendHandler tmsh = new TextMessageSendHandler(
- completion, text, true, encoder, encoderBuffer, this);
+ TextMessageSendHandler tmsh = new
TextMessageSendHandler(completion,
+ CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
tmsh.write();
} finally {
writeLock.unlock();
@@ -230,25 +230,27 @@ public abstract class WsRemoteEndpointBa
@Override
public void sendPartialString(String fragment, boolean isLast)
throws IOException {
- boolean locked = writeLock.tryLock();
- if (!locked) {
- throw new IllegalStateException(
- sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
- }
- try {
- FutureToSendHandler f2sh = new FutureToSendHandler();
- TextMessageSendHandler tmsh = new TextMessageSendHandler(
- f2sh, fragment, isLast, encoder, encoderBuffer, this);
- tmsh.write();
- f2sh.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- } finally {
- writeLock.unlock();
- }
+ sendPartialString(CharBuffer.wrap(fragment), isLast);
}
+ @Override
+ public OutputStream getSendStream() throws IOException {
+ return new WsOuputStream(this);
+ }
+
+
+ @Override
+ public Writer getSendWriter() throws IOException {
+ return new WsWriter(this);
+ }
+
+
+
+
+
+
+
/**
* Sends a control message, blocking until the message is sent.
@@ -279,6 +281,27 @@ public abstract class WsRemoteEndpointBa
}
+ void sendPartialString(CharBuffer fragment, boolean isLast)
+ throws IOException {
+ boolean locked = writeLock.tryLock();
+ if (!locked) {
+ throw new IllegalStateException(
+ sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
+ }
+ try {
+ FutureToSendHandler f2sh = new FutureToSendHandler();
+ TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh,
+ fragment, isLast, encoder, encoderBuffer, this);
+ tmsh.write();
+ f2sh.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+
private void sendMessage(byte opCode, ByteBuffer payload, boolean last,
SendHandler completion) {
@@ -337,18 +360,6 @@ public abstract class WsRemoteEndpointBa
@Override
- public OutputStream getSendStream() throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Writer getSendWriter() throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public void sendObject(Object o) throws IOException, EncodeException {
// TODO Auto-generated method stub
@@ -513,11 +524,11 @@ public abstract class WsRemoteEndpointBa
private final WsRemoteEndpointBase endpoint;
private volatile boolean isDone = false;
- public TextMessageSendHandler(SendHandler handler, String message,
+ public TextMessageSendHandler(SendHandler handler, CharBuffer message,
boolean isLast, CharsetEncoder encoder,
ByteBuffer encoderBuffer, WsRemoteEndpointBase endpoint) {
this.handler = handler;
- this.message = CharBuffer.wrap(message);
+ this.message = message;
this.isLast = isLast;
this.encoder = encoder.reset();
this.buffer = encoderBuffer;
@@ -707,4 +718,92 @@ public abstract class WsRemoteEndpointBa
return result;
}
}
+
+
+ private static class WsOuputStream extends OutputStream {
+
+ private final WsRemoteEndpointBase endpoint;
+ private final ByteBuffer buffer = ByteBuffer.allocate(8192);
+
+ public WsOuputStream(WsRemoteEndpointBase endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (buffer.remaining() == 0) {
+ flush();
+ }
+ buffer.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int remaining = buffer.remaining();
+ int written = 0;
+
+ while (remaining < len) {
+ buffer.put(b, off + written, remaining);
+ written += remaining;
+ flush();
+ remaining = buffer.remaining();
+ }
+ buffer.put(b, off + written, len - written);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ doWrite(false);
+ }
+
+ @Override
+ public void close() throws IOException {
+ doWrite(true);
+ }
+
+ private void doWrite(boolean last) throws IOException {
+ endpoint.sendPartialBytes(buffer, last);
+ buffer.clear();
+ }
+ }
+
+
+ private static class WsWriter extends Writer {
+
+ private final WsRemoteEndpointBase endpoint;
+ private final CharBuffer buffer = CharBuffer.allocate(8192);
+
+ public WsWriter(WsRemoteEndpointBase endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void write(char[] cbuf, int off, int len) throws IOException {
+ int remaining = buffer.remaining();
+ int written = 0;
+
+ while (remaining < len) {
+ buffer.put(cbuf, off + written, remaining);
+ written += remaining;
+ flush();
+ remaining = buffer.remaining();
+ }
+ buffer.put(cbuf, off + written, len - written);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ doWrite(false);
+ }
+
+ @Override
+ public void close() throws IOException {
+ doWrite(true);
+ }
+
+ private void doWrite(boolean last) throws IOException {
+ endpoint.sendPartialString(buffer, last);
+ buffer.clear();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]