Author: markt
Date: Mon Feb 11 08:59:28 2013
New Revision: 1444684

URL: http://svn.apache.org/r1444684
Log:
OutputStream and Writer support

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1444684&r1=1444683&r2=1444684&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Mon 
Feb 11 08:59:28 2013
@@ -218,8 +218,8 @@ public abstract class WsRemoteEndpointBa
                     sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
         }
         try {
-            TextMessageSendHandler tmsh = new TextMessageSendHandler(
-                    completion, text, true, encoder, encoderBuffer, this);
+            TextMessageSendHandler tmsh = new 
TextMessageSendHandler(completion,
+                    CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
             tmsh.write();
         } finally {
             writeLock.unlock();
@@ -230,25 +230,27 @@ public abstract class WsRemoteEndpointBa
     @Override
     public void sendPartialString(String fragment, boolean isLast)
             throws IOException {
-        boolean locked = writeLock.tryLock();
-        if (!locked) {
-            throw new IllegalStateException(
-                    sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
-        }
-        try {
-            FutureToSendHandler f2sh = new FutureToSendHandler();
-            TextMessageSendHandler tmsh = new TextMessageSendHandler(
-                    f2sh, fragment, isLast, encoder, encoderBuffer, this);
-            tmsh.write();
-            f2sh.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new IOException(e);
-        } finally {
-            writeLock.unlock();
-        }
+        sendPartialString(CharBuffer.wrap(fragment), isLast);
     }
 
 
+    @Override
+    public OutputStream getSendStream() throws IOException {
+        return new WsOuputStream(this);
+    }
+
+
+    @Override
+    public Writer getSendWriter() throws IOException {
+        return new WsWriter(this);
+    }
+
+
+
+
+
+
+
 
     /**
      * Sends a control message, blocking until the message is sent.
@@ -279,6 +281,27 @@ public abstract class WsRemoteEndpointBa
     }
 
 
+    void sendPartialString(CharBuffer fragment, boolean isLast)
+            throws IOException {
+        boolean locked = writeLock.tryLock();
+        if (!locked) {
+            throw new IllegalStateException(
+                    sm.getString("wsRemoteEndpoint.concurrentMessageSend"));
+        }
+        try {
+            FutureToSendHandler f2sh = new FutureToSendHandler();
+            TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh,
+                    fragment, isLast, encoder, encoderBuffer, this);
+            tmsh.write();
+            f2sh.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IOException(e);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+
     private void sendMessage(byte opCode, ByteBuffer payload, boolean last,
             SendHandler completion) {
 
@@ -337,18 +360,6 @@ public abstract class WsRemoteEndpointBa
 
 
     @Override
-    public OutputStream getSendStream() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Writer getSendWriter() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
     public void sendObject(Object o) throws IOException, EncodeException {
         // TODO Auto-generated method stub
 
@@ -513,11 +524,11 @@ public abstract class WsRemoteEndpointBa
         private final WsRemoteEndpointBase endpoint;
         private volatile boolean isDone = false;
 
-        public TextMessageSendHandler(SendHandler handler, String message,
+        public TextMessageSendHandler(SendHandler handler, CharBuffer message,
                 boolean isLast, CharsetEncoder encoder,
                 ByteBuffer encoderBuffer, WsRemoteEndpointBase endpoint) {
             this.handler = handler;
-            this.message = CharBuffer.wrap(message);
+            this.message = message;
             this.isLast = isLast;
             this.encoder = encoder.reset();
             this.buffer = encoderBuffer;
@@ -707,4 +718,92 @@ public abstract class WsRemoteEndpointBa
             return result;
         }
     }
+
+
+    private static class WsOuputStream extends OutputStream {
+
+        private final WsRemoteEndpointBase endpoint;
+        private final ByteBuffer buffer = ByteBuffer.allocate(8192);
+
+        public WsOuputStream(WsRemoteEndpointBase endpoint) {
+            this.endpoint = endpoint;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            if (buffer.remaining() == 0) {
+                flush();
+            }
+            buffer.put((byte) b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            int remaining = buffer.remaining();
+            int written = 0;
+
+            while (remaining < len) {
+                buffer.put(b, off + written, remaining);
+                written += remaining;
+                flush();
+                remaining = buffer.remaining();
+            }
+            buffer.put(b, off + written, len - written);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            doWrite(false);
+        }
+
+        @Override
+        public void close() throws IOException {
+            doWrite(true);
+        }
+
+        private void doWrite(boolean last) throws IOException {
+            endpoint.sendPartialBytes(buffer, last);
+            buffer.clear();
+        }
+    }
+
+
+    private static class WsWriter extends Writer {
+
+        private final WsRemoteEndpointBase endpoint;
+        private final CharBuffer buffer = CharBuffer.allocate(8192);
+
+        public WsWriter(WsRemoteEndpointBase endpoint) {
+            this.endpoint = endpoint;
+        }
+
+        @Override
+        public void write(char[] cbuf, int off, int len) throws IOException {
+            int remaining = buffer.remaining();
+            int written = 0;
+
+            while (remaining < len) {
+                buffer.put(cbuf, off + written, remaining);
+                written += remaining;
+                flush();
+                remaining = buffer.remaining();
+            }
+            buffer.put(cbuf, off + written, len - written);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            doWrite(false);
+        }
+
+        @Override
+        public void close() throws IOException {
+            doWrite(true);
+        }
+
+        private void doWrite(boolean last) throws IOException {
+            endpoint.sendPartialString(buffer, last);
+            buffer.clear();
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to