Author: markt
Date: Mon Nov 10 16:47:25 2014
New Revision: 1637935
URL: http://svn.apache.org/r1637935
Log:
Push write methods down to SocketWrapper for NIO2
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java?rev=1637935&r1=1637934&r2=1637935&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
(original)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletOutputStream.java
Mon Nov 10 16:47:25 2014
@@ -16,167 +16,27 @@
*/
package org.apache.coyote.http11.upgrade;
-import java.io.EOFException;
import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
import org.apache.tomcat.util.net.SocketWrapperBase;
public class Nio2ServletOutputStream extends
AbstractServletOutputStream<Nio2Channel> {
- private final Nio2Channel channel;
- private final int maxWrite;
- private final CompletionHandler<Integer, ByteBuffer> completionHandler;
- private final Semaphore writePending = new Semaphore(1);
-
public Nio2ServletOutputStream(SocketWrapperBase<Nio2Channel>
socketWrapper0,
int asyncWriteBufferSize) {
super(socketWrapper0, asyncWriteBufferSize);
- channel = socketWrapper0.getSocket();
- maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
- this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
- @Override
- public void completed(Integer nBytes, ByteBuffer attachment) {
- if (nBytes.intValue() < 0) {
- failed(new EOFException(), attachment);
- } else if (attachment.hasRemaining()) {
- channel.write(attachment, socketWrapper.getTimeout(),
- TimeUnit.MILLISECONDS, attachment,
completionHandler);
- } else {
- writePending.release();
- if (!Nio2Endpoint.isInline()) {
-
socketWrapper.getEndpoint().processSocket(socketWrapper,
- SocketStatus.OPEN_WRITE, false);
- }
- }
- }
- @Override
- public void failed(Throwable exc, ByteBuffer attachment) {
- socketWrapper.setError(true);
- writePending.release();
- if (exc instanceof AsynchronousCloseException) {
- // If already closed, don't call onError and close again
- return;
- }
- onError(exc);
- socketWrapper.getEndpoint().processSocket(socketWrapper,
SocketStatus.ERROR, true);
- }
- };
}
@Override
protected int doWrite(boolean block, byte[] b, int off, int len)
throws IOException {
- int leftToWrite = len;
- int count = 0;
- int offset = off;
-
- while (leftToWrite > 0) {
- int writeThisLoop;
- int writtenThisLoop;
-
- if (leftToWrite > maxWrite) {
- writeThisLoop = maxWrite;
- } else {
- writeThisLoop = leftToWrite;
- }
-
- writtenThisLoop = doWriteInternal(block, b, offset, writeThisLoop);
- if (writtenThisLoop < 0) {
- throw new EOFException();
- }
- count += writtenThisLoop;
- if (!block && writePending.availablePermits() == 0) {
- // Prevent concurrent writes in non blocking mode,
- // leftover data has to be buffered
- return count;
- }
- offset += writtenThisLoop;
- leftToWrite -= writtenThisLoop;
-
- if (writtenThisLoop < writeThisLoop) {
- break;
- }
- }
-
- return count;
- }
-
- private int doWriteInternal(boolean block, byte[] b, int off, int len)
- throws IOException {
- ByteBuffer buffer = channel.getBufHandler().getWriteBuffer();
- int written = 0;
- if (block) {
- buffer.clear();
- buffer.put(b, off, len);
- buffer.flip();
- try {
- written =
channel.write(buffer).get(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS).intValue();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- onError(e.getCause());
- throw (IOException) e.getCause();
- } else {
- onError(e);
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- onError(e);
- throw new IOException(e);
- } catch (TimeoutException e) {
- SocketTimeoutException ex = new SocketTimeoutException();
- onError(ex);
- throw ex;
- }
- } else {
- if (writePending.tryAcquire()) {
- buffer.clear();
- buffer.put(b, off, len);
- buffer.flip();
- Nio2Endpoint.startInline();
- channel.write(buffer, socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS, buffer, completionHandler);
- Nio2Endpoint.endInline();
- written = len;
- }
- }
- return written;
+ return ((Nio2SocketWrapper) socketWrapper).write(block, b, off, len);
}
@Override
protected void doFlush() throws IOException {
- try {
- // Block until a possible non blocking write is done
- if (writePending.tryAcquire(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS)) {
- writePending.release();
- channel.flush().get(socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS);
- } else {
- throw new TimeoutException();
- }
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- onError(e.getCause());
- throw (IOException) e.getCause();
- } else {
- onError(e);
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- onError(e);
- throw new IOException(e);
- } catch (TimeoutException e) {
- SocketTimeoutException ex = new SocketTimeoutException();
- onError(ex);
- throw ex;
- }
+ ((Nio2SocketWrapper) socketWrapper).flush();
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1637935&r1=1637934&r2=1637935&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Nov 10
16:47:25 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -736,18 +737,25 @@ public class Nio2Endpoint extends Abstra
private SendfileData sendfileData = null;
private boolean upgradeInit = false;
- private final CompletionHandler<Integer,
SocketWrapperBase<Nio2Channel>> completionHandler;
+ private final CompletionHandler<Integer,
SocketWrapperBase<Nio2Channel>> completionHandlerRead;
private boolean flipped = false;
private volatile boolean readPending = false;
private volatile boolean interest = true;
+ private final int maxWrite;
+ private final CompletionHandler<Integer, ByteBuffer>
completionHandlerWrite;
+ private final Semaphore writePending = new Semaphore(1);
+
+
public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
super(channel, endpoint);
- this.completionHandler = new CompletionHandler<Integer,
SocketWrapperBase<Nio2Channel>>() {
+ maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
+
+ this.completionHandlerRead = new CompletionHandler<Integer,
SocketWrapperBase<Nio2Channel>>() {
@Override
public void completed(Integer nBytes,
SocketWrapperBase<Nio2Channel> attachment) {
boolean notify = false;
- synchronized (completionHandler) {
+ synchronized (completionHandlerRead) {
if (nBytes.intValue() < 0) {
failed(new EOFException(), attachment);
} else {
@@ -773,6 +781,34 @@ public class Nio2Endpoint extends Abstra
getEndpoint().processSocket(attachment,
SocketStatus.ERROR, true);
}
};
+
+ this.completionHandlerWrite = new CompletionHandler<Integer,
ByteBuffer>() {
+ @Override
+ public void completed(Integer nBytes, ByteBuffer attachment) {
+ if (nBytes.intValue() < 0) {
+ failed(new EOFException(), attachment);
+ } else if (attachment.hasRemaining()) {
+ channel.write(attachment, getTimeout(),
+ TimeUnit.MILLISECONDS, attachment,
completionHandlerWrite);
+ } else {
+ writePending.release();
+ if (!Nio2Endpoint.isInline()) {
+
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE,
false);
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, ByteBuffer attachment) {
+ setError(true);
+ writePending.release();
+ if (exc instanceof AsynchronousCloseException) {
+ // If already closed, don't call onError and close
again
+ return;
+ }
+ getEndpoint().processSocket(Nio2SocketWrapper.this,
SocketStatus.ERROR, true);
+ }
+ };
+
}
@Override
@@ -808,7 +844,7 @@ public class Nio2Endpoint extends Abstra
@Override
public boolean isReady() throws IOException {
- synchronized (completionHandler) {
+ synchronized (completionHandlerRead) {
if (readPending) {
interest = true;
return false;
@@ -843,7 +879,7 @@ public class Nio2Endpoint extends Abstra
@Override
public int read(boolean block, byte[] b, int off, int len) throws
IOException {
- synchronized (completionHandler) {
+ synchronized (completionHandlerRead) {
if (readPending) {
return 0;
}
@@ -940,7 +976,7 @@ public class Nio2Endpoint extends Abstra
flipped = false;
Nio2Endpoint.startInline();
getSocket().read(readBuffer, getTimeout(),
TimeUnit.MILLISECONDS,
- this, completionHandler);
+ this, completionHandlerRead);
Nio2Endpoint.endInline();
if (!readPending) {
nRead = readBuffer.position();
@@ -949,8 +985,106 @@ public class Nio2Endpoint extends Abstra
return nRead;
}
+
+ public int write(boolean block, byte[] b, int off, int len) throws
IOException {
+ int leftToWrite = len;
+ int count = 0;
+ int offset = off;
+
+ while (leftToWrite > 0) {
+ int writeThisLoop;
+ int writtenThisLoop;
+
+ if (leftToWrite > maxWrite) {
+ writeThisLoop = maxWrite;
+ } else {
+ writeThisLoop = leftToWrite;
+ }
+
+ writtenThisLoop = writeInternal(block, b, offset,
writeThisLoop);
+ if (writtenThisLoop < 0) {
+ throw new EOFException();
+ }
+ count += writtenThisLoop;
+ if (!block && writePending.availablePermits() == 0) {
+ // Prevent concurrent writes in non blocking mode,
+ // leftover data has to be buffered
+ return count;
+ }
+ offset += writtenThisLoop;
+ leftToWrite -= writtenThisLoop;
+
+ if (writtenThisLoop < writeThisLoop) {
+ break;
+ }
+ }
+
+ return count;
+ }
+
+
+ private int writeInternal(boolean block, byte[] b, int off, int len)
+ throws IOException {
+ ByteBuffer buffer = getSocket().getBufHandler().getWriteBuffer();
+ int written = 0;
+ if (block) {
+ buffer.clear();
+ buffer.put(b, off, len);
+ buffer.flip();
+ try {
+ written = getSocket().write(buffer).get(getTimeout(),
TimeUnit.MILLISECONDS).intValue();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException(e);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (TimeoutException e) {
+ SocketTimeoutException ex = new SocketTimeoutException();
+ throw ex;
+ }
+ } else {
+ if (writePending.tryAcquire()) {
+ buffer.clear();
+ buffer.put(b, off, len);
+ buffer.flip();
+ Nio2Endpoint.startInline();
+ getSocket().write(buffer, getTimeout(),
TimeUnit.MILLISECONDS, buffer, completionHandlerWrite);
+ Nio2Endpoint.endInline();
+ written = len;
+ }
+ }
+ return written;
+ }
+
+
+ public void flush() throws IOException {
+ try {
+ // Block until a possible non blocking write is done
+ if (writePending.tryAcquire(getTimeout(),
TimeUnit.MILLISECONDS)) {
+ writePending.release();
+ getSocket().flush().get(getTimeout(),
TimeUnit.MILLISECONDS);
+ } else {
+ throw new TimeoutException();
+ }
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException(e);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (TimeoutException e) {
+ SocketTimeoutException ex = new SocketTimeoutException();
+ throw ex;
+ }
+ }
}
+
// ------------------------------------------------ Application Buffer
Handler
public static class NioBufferHandler implements ApplicationBufferHandler {
private ByteBuffer readbuf = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]