Author: markt
Date: Thu Feb 23 00:40:02 2012
New Revision: 1292598
URL: http://svn.apache.org/viewvc?rev=1292598&view=rev
Log:
Refactor
Added:
tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java
- copied, changed from r1292498,
tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
Removed:
tomcat/trunk/java/org/apache/catalina/websocket/WsFrameHeader.java
Modified:
tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java
Modified: tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java?rev=1292598&r1=1292597&r2=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/StreamInbound.java Thu Feb
23 00:40:02 2012
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
-import java.nio.ByteBuffer;
import org.apache.coyote.http11.upgrade.UpgradeInbound;
import org.apache.coyote.http11.upgrade.UpgradeOutbound;
@@ -55,15 +54,15 @@ public abstract class StreamInbound impl
try {
WsInputStream wsIs = new WsInputStream(processor);
- WsFrameHeader header = wsIs.getFrameHeader();
+ WsFrame frame = wsIs.getFrame();
// TODO User defined extensions may define values for rsv
- if (header.getRsv() > 0) {
+ if (frame.getRsv() > 0) {
getOutbound().close(1002, null);
return SocketState.CLOSED;
}
- byte opCode = header.getOpCode();
+ byte opCode = frame.getOpCode();
if (opCode == Constants.OPCODE_BINARY) {
onBinaryData(wsIs);
@@ -75,22 +74,14 @@ public abstract class StreamInbound impl
return SocketState.UPGRADED;
}
- // Must be a control frame and control frames:
- // - have a limited payload length
- // - must not be fragmented
- if (wsIs.getPayloadLength() > 125 ||
!wsIs.getFrameHeader().getFin()) {
- getOutbound().close(1002, null);
- return SocketState.CLOSED;
- }
-
if (opCode == Constants.OPCODE_CLOSE){
- doClose(wsIs);
+ doClose(frame);
return SocketState.CLOSED;
} else if (opCode == Constants.OPCODE_PING) {
- doPing(wsIs);
+ doPing(frame);
return SocketState.UPGRADED;
} else if (opCode == Constants.OPCODE_PONG) {
- doPong(wsIs);
+ // NO-OP
return SocketState.UPGRADED;
}
@@ -105,70 +96,23 @@ public abstract class StreamInbound impl
}
}
- private void doClose(WsInputStream is) throws IOException {
- // Control messages have a max size of 125 bytes. Need to try and read
- // one more so we reach end of stream (less 2 for the status). Note
that
- // the 125 byte limit is enforced in #onData() before this method is
- // ever called.
- ByteBuffer data = null;
-
- int status = is.read();
- if (status != -1) {
- status = status << 8;
- int i = is.read();
- if (i == -1) {
- // EOF during middle of close message. Closing anyway but set
- // close code to protocol error
- status = 1002;
- } else {
- status = status + i;
- if (is.getPayloadLength() > 2) {
- data = ByteBuffer.allocate((int) is.getPayloadLength() -
1);
- int read = 0;
- while (read > -1) {
- data.position(data.position() + read);
- read = is.read(data.array(), data.position(),
- data.remaining());
- }
- data.flip();
- }
- }
+ private void doClose(WsFrame frame) throws IOException {
+ if (frame.getPayLoadLength() > 0) {
+ // Must be status (2 bytes) plus optional message
+ if (frame.getPayLoadLength() == 1) {
+ throw new IOException();
+ }
+ int status = (frame.getPayLoad().get() & 0xFF) << 8;
+ status += frame.getPayLoad().get() & 0xFF;
+ getOutbound().close(status, frame.getPayLoad());
} else {
- status = 0;
- }
- getOutbound().close(status, data);
- }
-
- private void doPing(WsInputStream is) throws IOException {
- // Control messages have a max size of 125 bytes. Need to try and read
- // one more so we reach end of stream. Note that the 125 byte limit is
- // enforced in #onData() before this method is ever called.
- ByteBuffer data = null;
-
- if (is.getPayloadLength() > 0) {
- data = ByteBuffer.allocate((int) is.getPayloadLength() + 1);
-
- int read = 0;
- while (read > -1) {
- data.position(data.position() + read);
- read = is.read(data.array(), data.position(),
data.remaining());
- }
-
- data.flip();
+ // No status
+ getOutbound().close(0, null);
}
-
- getOutbound().pong(data);
}
- private void doPong(WsInputStream is) throws IOException {
- // Unsolicited pong - swallow it
- // Control messages have a max size of 125 bytes. Note that the 125
byte
- // limit is enforced in #onData() before this method is ever called so
- // the loop below is not unbounded.
- int read = 0;
- while (read > -1) {
- read = is.read();
- }
+ private void doPing(WsFrame frame) throws IOException {
+ getOutbound().pong(frame.getPayLoad());
}
protected abstract void onBinaryData(InputStream is) throws IOException;
Copied: tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java (from
r1292498, tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java)
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java?p2=tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java&p1=tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java&r1=1292498&r2=1292598&rev=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsFrame.java Thu Feb 23
00:40:02 2012
@@ -17,80 +17,102 @@
package org.apache.catalina.websocket;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.catalina.util.Conversions;
import org.apache.coyote.http11.upgrade.UpgradeProcessor;
/**
- * This class is used to read WebSocket frames from the underlying socket and
- * makes the payload available for reading as an {@link InputStream}. It only
- * makes the number of bytes declared in the payload length available for
- * reading even if more bytes are available from the socket.
+ * Represents a WebSocket frame with the exception of the payload for
+ * non-control frames.
*/
-public class WsInputStream extends java.io.InputStream {
+public class WsFrame {
- private UpgradeProcessor<?> processor;
- private WsFrameHeader wsFrameHeader;
- private long payloadLength = -1;
+ private final boolean fin;
+ private final int rsv;
+ private final byte opCode;
private int[] mask = new int[4];
+ private long payloadLength;
+ private ByteBuffer payload;
+ public WsFrame(UpgradeProcessor<?> processor) throws IOException {
- private long remaining;
- private long readThisFragment;
-
- public WsInputStream(UpgradeProcessor<?> processor) throws IOException {
- this.processor = processor;
-
- processFrameHeader();
- }
-
-
- private void processFrameHeader() throws IOException {
-
- // TODO: Per frame extension handling is not currently supported.
-
- // TODO: Handle control frames between fragments
-
- this.wsFrameHeader = new WsFrameHeader(processorRead());
+ int b = processorRead(processor);
+ fin = (b & 0x80) > 0;
+ rsv = (b & 0x70) >>> 4;
+ opCode = (byte) (b & 0x0F);
+ b = processorRead(processor);
// Client data must be masked
- int i = processorRead();
- if ((i & 0x80) == 0) {
+ if ((b & 0x80) == 0) {
// TODO: StringManager / i18n
throw new IOException("Client frame not masked");
}
- payloadLength = i & 0x7F;
+ payloadLength = b & 0x7F;
if (payloadLength == 126) {
byte[] extended = new byte[2];
- processorRead(extended);
+ processorRead(processor, extended);
payloadLength = Conversions.byteArrayToLong(extended);
} else if (payloadLength == 127) {
byte[] extended = new byte[8];
- processorRead(extended);
+ processorRead(processor, extended);
payloadLength = Conversions.byteArrayToLong(extended);
}
- remaining = payloadLength;
+
+ boolean isControl = (opCode & 0x08) > 0;
+
+ if (isControl) {
+ if (payloadLength > 125) {
+ throw new IOException();
+ }
+ if (!fin) {
+ throw new IOException();
+ }
+ }
for (int j = 0; j < mask.length; j++) {
- mask[j] = processor.read() & 0xFF;
+ mask[j] = processorRead(processor) & 0xFF;
+ }
+
+ if (isControl) {
+ // Note: Payload limited to <= 125 bytes by test above
+ payload = ByteBuffer.allocate((int) payloadLength);
+ processorRead(processor, payload);
+ } else {
+ payload = null;
}
+ }
- readThisFragment = 0;
+ public boolean getFin() {
+ return fin;
}
- public WsFrameHeader getFrameHeader() {
- return wsFrameHeader;
+ public int getRsv() {
+ return rsv;
}
- public long getPayloadLength() {
+ public byte getOpCode() {
+ return opCode;
+ }
+
+ public int[] getMask() {
+ return mask;
+ }
+
+ public long getPayLoadLength() {
return payloadLength;
}
+ public ByteBuffer getPayLoad() {
+ return payload;
+ }
+
// ----------------------------------- Guaranteed read methods for
processor
- private int processorRead() throws IOException {
+ private int processorRead(UpgradeProcessor<?> processor)
+ throws IOException {
int result = processor.read();
if (result == -1) {
// TODO i18n
@@ -100,7 +122,8 @@ public class WsInputStream extends java.
}
- private void processorRead(byte[] bytes) throws IOException {
+ private void processorRead(UpgradeProcessor<?> processor, byte[] bytes)
+ throws IOException {
int read = 0;
int last = 0;
while (read < bytes.length) {
@@ -113,31 +136,21 @@ public class WsInputStream extends java.
}
}
- // ----------------------------------------------------- InputStream
methods
- @Override
- public int read() throws IOException {
- while (remaining == 0 && !getFrameHeader().getFin()) {
- // Need more data - process next frame
- processFrameHeader();
-
- if (getFrameHeader().getOpCode() != Constants.OPCODE_CONTINUATION)
{
+ /*
+ * Intended to read whole payload. Therefore able to unmask.
+ */
+ private void processorRead(UpgradeProcessor<?> processor, ByteBuffer bb)
+ throws IOException {
+ int last = 0;
+ while (bb.hasRemaining()) {
+ last = processor.read();
+ if (last == -1) {
// TODO i18n
- throw new IOException("Not a continuation frame");
+ throw new IOException("End of stream before end of frame");
}
+ bb.put((byte) (last ^ mask[bb.position() % 4]));
}
-
- if (remaining == 0) {
- return -1;
- }
-
- remaining--;
- readThisFragment++;
-
- int masked = processor.read();
- if(masked == -1) {
- return -1;
- }
- return masked ^ mask[(int) ((readThisFragment - 1) % 4)];
+ bb.flip();
}
}
Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java?rev=1292598&r1=1292597&r2=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsInputStream.java Thu Feb
23 00:40:02 2012
@@ -18,7 +18,6 @@ package org.apache.catalina.websocket;
import java.io.IOException;
-import org.apache.catalina.util.Conversions;
import org.apache.coyote.http11.upgrade.UpgradeProcessor;
/**
@@ -30,98 +29,36 @@ import org.apache.coyote.http11.upgrade.
public class WsInputStream extends java.io.InputStream {
private UpgradeProcessor<?> processor;
- private WsFrameHeader wsFrameHeader;
- private long payloadLength = -1;
- private int[] mask = new int[4];
-
+ private WsFrame frame;
private long remaining;
private long readThisFragment;
public WsInputStream(UpgradeProcessor<?> processor) throws IOException {
this.processor = processor;
-
- processFrameHeader();
+ processFrame();
}
-
- private void processFrameHeader() throws IOException {
-
- // TODO: Per frame extension handling is not currently supported.
-
- // TODO: Handle control frames between fragments
-
- this.wsFrameHeader = new WsFrameHeader(processorRead());
-
- // Client data must be masked
- int i = processorRead();
- if ((i & 0x80) == 0) {
- // TODO: StringManager / i18n
- throw new IOException("Client frame not masked");
- }
-
- payloadLength = i & 0x7F;
- if (payloadLength == 126) {
- byte[] extended = new byte[2];
- processorRead(extended);
- payloadLength = Conversions.byteArrayToLong(extended);
- } else if (payloadLength == 127) {
- byte[] extended = new byte[8];
- processorRead(extended);
- payloadLength = Conversions.byteArrayToLong(extended);
- }
- remaining = payloadLength;
-
- for (int j = 0; j < mask.length; j++) {
- mask[j] = processor.read() & 0xFF;
- }
-
- readThisFragment = 0;
+ public WsFrame getFrame() {
+ return frame;
}
- public WsFrameHeader getFrameHeader() {
- return wsFrameHeader;
- }
-
- public long getPayloadLength() {
- return payloadLength;
- }
-
-
- // ----------------------------------- Guaranteed read methods for
processor
-
- private int processorRead() throws IOException {
- int result = processor.read();
- if (result == -1) {
- // TODO i18n
- throw new IOException("End of stream before end of frame");
- }
- return result;
+ private void processFrame() throws IOException {
+ frame = new WsFrame(processor);
+ readThisFragment = 0;
+ remaining = frame.getPayLoadLength();
}
- private void processorRead(byte[] bytes) throws IOException {
- int read = 0;
- int last = 0;
- while (read < bytes.length) {
- last = processor.read(bytes, read, bytes.length - read);
- if (last == -1) {
- // TODO i18n
- throw new IOException("End of stream before end of frame");
- }
- read += last;
- }
- }
-
// ----------------------------------------------------- InputStream
methods
@Override
public int read() throws IOException {
- while (remaining == 0 && !getFrameHeader().getFin()) {
+ while (remaining == 0 && !getFrame().getFin()) {
// Need more data - process next frame
- processFrameHeader();
+ processFrame();
- if (getFrameHeader().getOpCode() != Constants.OPCODE_CONTINUATION)
{
+ if (getFrame().getOpCode() != Constants.OPCODE_CONTINUATION) {
// TODO i18n
throw new IOException("Not a continuation frame");
}
@@ -138,6 +75,6 @@ public class WsInputStream extends java.
if(masked == -1) {
return -1;
}
- return masked ^ mask[(int) ((readThisFragment - 1) % 4)];
+ return masked ^ frame.getMask()[(int) ((readThisFragment - 1) % 4)];
}
}
Modified: tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java?rev=1292598&r1=1292597&r2=1292598&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java (original)
+++ tomcat/trunk/java/org/apache/catalina/websocket/WsOutbound.java Thu Feb 23
00:40:02 2012
@@ -126,15 +126,15 @@ public class WsOutbound {
upgradeOutbound.write(0x88);
if (status == 0) {
upgradeOutbound.write(0);
- } else if (data == null) {
+ } else if (data == null || data.position() == data.limit()) {
upgradeOutbound.write(2);
upgradeOutbound.write(status >>> 8);
upgradeOutbound.write(status);
} else {
- upgradeOutbound.write(2 + data.limit());
+ upgradeOutbound.write(2 + data.limit() - data.position());
upgradeOutbound.write(status >>> 8);
upgradeOutbound.write(status);
- upgradeOutbound.write(data.array(), 0, data.limit());
+ upgradeOutbound.write(data.array(), data.position(), data.limit());
}
upgradeOutbound.flush();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]