Author: markt
Date: Sat Nov 24 17:26:51 2012
New Revision: 1413209
URL: http://svn.apache.org/viewvc?rev=1413209&view=rev
Log:
First cut of HTTP Upgrade for BIO with blocking
- NIO, APR still broken
- Non-blocking IO not implemented for Upgrade
- WebSocket still broken
Removed:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeInbound.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeOutbound.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties?rev=1413209&r1=1413208&r2=1413209&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
Sat Nov 24 17:26:51 2012
@@ -13,6 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+upgrade.sis.isFinished.ise=It is illegal to call isFinished() when the
ServletInputStream is not in non-blocking mode (i.e. setReadListener() must be
called first)
+upgrade.sis.isReady.ise=It is illegal to call isReady() when the
ServletInputStream is not in non-blocking mode (i.e. setReadListener() must be
called first)
+upgrade.sis.readListener.null=It is illegal to pass null to setReadListener()
+upgrade.sos.canWrite.ise=It is illegal to call canWrite() when the
ServletOutputStream is not in non-blocking mode (i.e. setWriteListener() must
be called first)
+upgrade.sos.writeListener.null=It is illegal to pass null to setWriteListener()
+
apr.error=Unexpected error [{0}] reading data from the APR/native socket.
nio.eof.error=Unexpected EOF read on the socket
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java?rev=1413209&r1=1413208&r2=1413209&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
Sat Nov 24 17:26:51 2012
@@ -27,59 +27,63 @@ import org.apache.tomcat.util.net.Socket
public class UpgradeBioProcessor extends UpgradeProcessor<Socket> {
- private final InputStream inputStream;
- private final OutputStream outputStream;
+ private static final int INFINITE_TIMEOUT = 0;
public UpgradeBioProcessor(SocketWrapper<Socket> wrapper,
ProtocolHandler httpUpgradeProcessor) throws IOException {
- super(upgradeInbound);
+ super(httpUpgradeProcessor, new BioUpgradeServletInputStream(wrapper),
+ new BioUpgradeServletOutputStream(wrapper));
- int timeout = upgradeInbound.getReadTimeout();
- if (timeout < 0) {
- timeout = 0;
- }
- wrapper.getSocket().setSoTimeout(timeout);
-
- this.inputStream = wrapper.getSocket().getInputStream();
- this.outputStream = wrapper.getSocket().getOutputStream();
+ wrapper.getSocket().setSoTimeout(INFINITE_TIMEOUT);
}
- /*
- * Output methods
- */
- @Override
- public void flush() throws IOException {
- outputStream.flush();
- }
+ // ----------------------------------------------------------- Inner
classes
+ private static class BioUpgradeServletInputStream
+ extends UpgradeServletInputStream {
- @Override
- public void write(int b) throws IOException {
- outputStream.write(b);
- }
+ private final InputStream is;
+ public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper)
+ throws IOException {
+ is = wrapper.getSocket().getInputStream();
+ }
+
+ @Override
+ protected int doRead() throws IOException {
+ return is.read();
+ }
- @Override
- public void write(byte[]b, int off, int len) throws IOException {
- outputStream.write(b, off, len);
+ @Override
+ protected int doRead(byte[] b, int off, int len) throws IOException {
+ return is.read(b, off, len);
+ }
}
+ private static class BioUpgradeServletOutputStream
+ extends UpgradeServletOutputStream {
- /*
- * Input methods
- */
- @Override
- public int read() throws IOException {
- return inputStream.read();
- }
+ private final OutputStream os;
+
+ public BioUpgradeServletOutputStream(SocketWrapper<Socket> wrapper)
+ throws IOException {
+ os = wrapper.getSocket().getOutputStream();
+ }
+ @Override
+ protected void doWrite(int b) throws IOException {
+ os.write(b);
+ }
- @Override
- public int read(boolean block, byte[] bytes, int off, int len)
- throws IOException {
- // The BIO endpoint always uses blocking IO so the block parameter is
- // ignored and a blocking read is performed.
- return inputStream.read(bytes, off, len);
+ @Override
+ protected void doWrite(byte[] b, int off, int len) throws IOException {
+ os.write(b, off, len);
+ }
+
+ @Override
+ protected void doFlush() throws IOException {
+ os.flush();
+ }
}
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1413209&r1=1413208&r2=1413209&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
Sat Nov 24 17:26:51 2012
@@ -19,6 +19,13 @@ package org.apache.coyote.http11.upgrade
import java.io.IOException;
import java.util.concurrent.Executor;
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.ProtocolHandler;
+import javax.servlet.http.WebConnection;
+
import org.apache.coyote.Processor;
import org.apache.coyote.Request;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -27,73 +34,66 @@ import org.apache.tomcat.util.net.Socket
import org.apache.tomcat.util.net.SocketWrapper;
import org.apache.tomcat.util.res.StringManager;
-public abstract class UpgradeProcessor<S> implements Processor<S> {
+public abstract class UpgradeProcessor<S>
+ implements Processor<S>, WebConnection {
protected static final StringManager sm =
StringManager.getManager(Constants.Package);
- private final UpgradeInbound upgradeInbound;
+ private final ProtocolHandler httpUpgradeHandler;
+ private final ServletInputStream upgradeServletInputStream;
+ private final ServletOutputStream upgradeServletOutputStream;
+
+ protected UpgradeProcessor (ProtocolHandler httpUpgradeHandler,
+ UpgradeServletInputStream upgradeServletInputStream,
+ UpgradeServletOutputStream upgradeServletOutputStream) {
+ this.httpUpgradeHandler = httpUpgradeHandler;
+ this.upgradeServletInputStream = upgradeServletInputStream;
+ this.upgradeServletOutputStream = upgradeServletOutputStream;
+ this.httpUpgradeHandler.init(this);
+ }
+
- protected UpgradeProcessor (UpgradeInbound upgradeInbound) {
- this.upgradeInbound = upgradeInbound;
- upgradeInbound.setUpgradeProcessor(this);
- upgradeInbound.setUpgradeOutbound(new UpgradeOutbound(this));
- }
-
- // Output methods
- public abstract void flush() throws IOException;
- public abstract void write(int b) throws IOException;
- public abstract void write(byte[] b, int off, int len) throws IOException;
-
- // Input methods
- /**
- * This is always a blocking read of a single byte.
- *
- * @return The next byte or -1 if the end of the input is reached.
- *
- * @throws IOException If a problem occurs trying to read from the input
- */
- public abstract int read() throws IOException;
-
- /**
- * Read up to len bytes from the input in either blocking or non-blocking
- * mode (where non-blocking is supported). If the input does not support
- * non-blocking reads, a blcoking read will be performed.
- *
- * @param block
- * @param bytes
- * @param off
- * @param len
- * @return The number of bytes read or -1 if the end of the input is
- * reached. Non-blocking reads may return zero if no data is
- * available. Blocking reads never return zero.
- *
- * @throws IOException If a problem occurs trying to read from the input
- */
- public abstract int read(boolean block, byte[] bytes, int off, int len)
- throws IOException;
+ // --------------------------------------------------- WebConnection
methods
@Override
- public final UpgradeInbound getUpgradeInbound() {
- return upgradeInbound;
+ public ServletInputStream getInputStream() throws IOException {
+ return upgradeServletInputStream;
}
@Override
- public final SocketState upgradeDispatch() throws IOException {
- return upgradeInbound.onData();
+ public ServletOutputStream getOutputStream() throws IOException {
+ return upgradeServletOutputStream;
}
+
+ // ------------------------------------------- Implemented Processor
methods
+
@Override
public final boolean isUpgrade() {
return true;
}
@Override
+ public ProtocolHandler getHttpUpgradeHandler() {
+ return httpUpgradeHandler;
+ }
+
+ @Override
+ public final SocketState upgradeDispatch() throws IOException {
+
+ // TODO Handle read/write ready for non-blocking IO
+ return SocketState.UPGRADED;
+ }
+
+ @Override
public final void recycle(boolean socketClosing) {
// Currently a NO-OP as upgrade processors are not recycled.
}
- // NO-OP methods for upgrade
+
+ // ---------------------------- Processor methods that are NO-OP for
upgrade
+
@Override
public final Executor getExecutor() {
return null;
@@ -139,4 +139,104 @@ public abstract class UpgradeProcessor<S
public final void setSslSupport(SSLSupport sslSupport) {
// NOOP
}
+
+
+ // ----------------------------------------------------------- Inner
classes
+
+ protected abstract static class UpgradeServletInputStream extends
+ ServletInputStream {
+
+ private volatile ReadListener readListener = null;
+
+ @Override
+ public final boolean isFinished() {
+ if (readListener == null) {
+ throw new IllegalStateException(
+ sm.getString("upgrade.sis.isFinished.ise"));
+ }
+
+ // TODO Support non-blocking IO
+ return false;
+ }
+
+ @Override
+ public final boolean isReady() {
+ if (readListener == null) {
+ throw new IllegalStateException(
+ sm.getString("upgrade.sis.isReady.ise"));
+ }
+
+ // TODO Support non-blocking IO
+ return false;
+ }
+
+ @Override
+ public final void setReadListener(ReadListener listener) {
+ if (listener == null) {
+ throw new NullPointerException(
+ sm.getString("upgrade.sis.readListener.null"));
+ }
+ this.readListener = listener;
+ }
+
+ @Override
+ public final int read() throws IOException {
+ return doRead();
+ }
+
+ @Override
+ public final int read(byte[] b, int off, int len) throws IOException {
+ return doRead(b, off, len);
+ }
+
+ protected abstract int doRead() throws IOException;
+ protected abstract int doRead(byte[] b, int off, int len)
+ throws IOException;
+ }
+
+ protected abstract static class UpgradeServletOutputStream extends
+ ServletOutputStream {
+
+ private volatile WriteListener writeListener = null;
+
+ @Override
+ public boolean canWrite() {
+ if (writeListener == null) {
+ throw new IllegalStateException(
+ sm.getString("upgrade.sos.canWrite.ise"));
+ }
+
+ // TODO Support non-blocking IO
+ return false;
+ }
+
+ @Override
+ public void setWriteListener(WriteListener listener) {
+ if (listener == null) {
+ throw new NullPointerException(
+ sm.getString("upgrade.sos.writeListener.null"));
+ }
+ this.writeListener = listener;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ doWrite(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ doWrite(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ doFlush();
+ }
+
+ protected abstract void doWrite(int b) throws IOException;
+ protected abstract void doWrite(byte[] b, int off, int len)
+ throws IOException;
+ protected abstract void doFlush() throws IOException;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]