This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new 9d6388a Refactor async IO implementation to SocketWrapperBase 9d6388a is described below commit 9d6388affba4e9140c50e8ca8938569f8709d008 Author: remm <r...@apache.org> AuthorDate: Tue May 14 18:00:32 2019 +0200 Refactor async IO implementation to SocketWrapperBase Remove all duplicate code I could find, although it is likely there will be further tweaks needed. --- .../catalina/security/SecurityClassLoad.java | 10 +- java/org/apache/tomcat/util/net/AprEndpoint.java | 250 ++--------------- java/org/apache/tomcat/util/net/Nio2Endpoint.java | 224 ++------------- java/org/apache/tomcat/util/net/NioEndpoint.java | 245 ++--------------- .../apache/tomcat/util/net/SocketWrapperBase.java | 301 ++++++++++++++++++++- webapps/docs/changelog.xml | 4 + 6 files changed, 379 insertions(+), 655 deletions(-) diff --git a/java/org/apache/catalina/security/SecurityClassLoad.java b/java/org/apache/catalina/security/SecurityClassLoad.java index 05272f4..2a44caf 100644 --- a/java/org/apache/catalina/security/SecurityClassLoad.java +++ b/java/org/apache/catalina/security/SecurityClassLoad.java @@ -190,16 +190,14 @@ public final class SecurityClassLoad { loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableAdd"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableCancel"); loader.loadClass(basePackage + "util.net.NioBlockingSelector$BlockPoller$RunnableRemove"); - loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$OperationState"); - loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$VectoredIOCompletionHandler"); - loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$OperationState"); - loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$VectoredIOCompletionHandler"); - loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$OperationState"); - loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$VectoredIOCompletionHandler"); + loader.loadClass(basePackage + "util.net.AprEndpoint$AprSocketWrapper$AprOperationState"); + loader.loadClass(basePackage + "util.net.NioEndpoint$NioSocketWrapper$NioOperationState"); + loader.loadClass(basePackage + "util.net.Nio2Endpoint$Nio2SocketWrapper$Nio2OperationState"); loader.loadClass(basePackage + "util.net.SocketWrapperBase$BlockingMode"); loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionCheck"); loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionHandlerCall"); loader.loadClass(basePackage + "util.net.SocketWrapperBase$CompletionState"); + loader.loadClass(basePackage + "util.net.SocketWrapperBase$VectoredIOCompletionHandler"); // security loader.loadClass(basePackage + "util.security.PrivilegedGetTccl"); loader.loadClass(basePackage + "util.security.PrivilegedSetTccl"); diff --git a/java/org/apache/tomcat/util/net/AprEndpoint.java b/java/org/apache/tomcat/util/net/AprEndpoint.java index ad8e16d..d5898fe 100644 --- a/java/org/apache/tomcat/util/net/AprEndpoint.java +++ b/java/org/apache/tomcat/util/net/AprEndpoint.java @@ -22,9 +22,6 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -import java.nio.channels.InterruptedByTimeoutException; -import java.nio.channels.ReadPendingException; -import java.nio.channels.WritePendingException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; @@ -2159,22 +2156,9 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB // This field should only be used by Poller#run() private int pollerFlags = 0; - private final Semaphore readPending; - private OperationState<?> readOperation = null; - private final Semaphore writePending; - private OperationState<?> writeOperation = null; - public AprSocketWrapper(Long socket, AprEndpoint endpoint) { super(socket, endpoint); - if (endpoint.getUseAsyncIO()) { - readPending = new Semaphore(1); - writePending = new Semaphore(1); - } else { - readPending = null; - writePending = null; - } - // TODO Make the socketWriteBuffer size configurable and align the // SSL and app buffer size settings with NIO & NIO2. if (endpoint.isSSLEnabled()) { @@ -2779,59 +2763,32 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB } @Override - public boolean hasAsyncIO() { - // The semaphores are only created if async IO is enabled - return (readPending != null); + protected <A> OperationState<A> newOperationState(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { + return new AprOperationState<A>(read, buffers, offset, length, block, + timeout, unit, attachment, check, handler, semaphore, completion); } - /** - * Internal state tracker for scatter/gather operations. - */ - private class OperationState<A> implements Runnable { - private final boolean read; - private final ByteBuffer[] buffers; - private final int offset; - private final int length; - private final A attachment; - private final BlockingMode block; - private final CompletionCheck check; - private final CompletionHandler<Long, ? super A> handler; - private final Semaphore semaphore; - private final VectoredIOCompletionHandler<A> completion; - private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, - BlockingMode block, A attachment, CompletionCheck check, + private class AprOperationState<A> extends OperationState<A> { + private AprOperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler, Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { - this.read = read; - this.buffers = buffers; - this.offset = offset; - this.length = length; - this.block = block; - this.attachment = attachment; - this.check = check; - this.handler = handler; - this.semaphore = semaphore; - this.completion = completion; - } - private volatile boolean inline = true; - private volatile long nBytes = 0; - private volatile CompletionState state = CompletionState.PENDING; - private boolean completionDone = true; - - public boolean process() { - try { - getEndpoint().getExecutor().execute(this); - } catch (RejectedExecutionException ree) { - log.warn(sm.getString("endpoint.executor.fail", AprSocketWrapper.this) , ree); - return false; - } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - // This means we got an OOM or similar creating a thread, or that - // the pool and its queue are full - log.error(sm.getString("endpoint.process.fail"), t); - return false; - } - return true; + super(read, buffers, offset, length, block, + timeout, unit, attachment, check, handler, semaphore, completion); + } + + @Override + protected boolean isInline() { + return inline; + } + + @Override + protected void start() { + run(); } @Override @@ -2914,168 +2871,5 @@ public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallB } - @Override - public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); - } - - @Override - public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); - } - - private <A> CompletionState readOrWrite(boolean read, - ByteBuffer[] buffers, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - IOException ioe = getError(); - if (ioe != null) { - handler.failed(ioe, attachment); - return CompletionState.ERROR; - } - if (timeout == -1) { - timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); - } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) { - if (read) { - setReadTimeout(unit.toMillis(timeout)); - } else { - setWriteTimeout(unit.toMillis(timeout)); - } - } - if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { - try { - if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } else { - if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { - if (block == BlockingMode.NON_BLOCK) { - return CompletionState.NOT_DONE; - } else { - handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); - return CompletionState.ERROR; - } - } - } - VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); - OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, - attachment, check, handler, read ? readPending : writePending, completion); - if (read) { - readOperation = state; - } else { - writeOperation = state; - } - state.run(); - if (block == BlockingMode.BLOCK) { - synchronized (state) { - if (state.state == CompletionState.PENDING) { - try { - state.wait(unit.toMillis(timeout)); - if (state.state == CompletionState.PENDING) { - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } - } - } - return state.state; - } - - private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { - @Override - public void completed(Long nBytes, OperationState<A> state) { - if (nBytes.longValue() < 0) { - failed(new EOFException(), state); - } else { - state.nBytes += nBytes.longValue(); - CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE; - boolean complete = true; - boolean completion = true; - if (state.check != null) { - CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length); - if (call == CompletionHandlerCall.CONTINUE) { - complete = false; - } else if (call == CompletionHandlerCall.NONE) { - completion = false; - } - } - if (complete) { - boolean notify = false; - state.semaphore.release(); - if (state.read) { - readOperation = null; - } else { - writeOperation = null; - } - if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { - notify = true; - } else { - state.state = currentState; - } - if (completion && state.handler != null) { - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); - } - synchronized (state) { - state.completionDone = true; - if (notify) { - state.state = currentState; - state.notify(); - } - } - } else { - synchronized (state) { - state.completionDone = true; - } - state.run(); - } - } - } - @Override - public void failed(Throwable exc, OperationState<A> state) { - IOException ioe = null; - if (exc instanceof InterruptedByTimeoutException) { - ioe = new SocketTimeoutException(); - exc = ioe; - } else if (exc instanceof IOException) { - ioe = (IOException) exc; - } - setError(ioe); - boolean notify = false; - state.semaphore.release(); - if (state.read) { - readOperation = null; - } else { - writeOperation = null; - } - if (state.block == BlockingMode.BLOCK) { - notify = true; - } else { - state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE; - } - if (state.handler != null) { - state.handler.failed(exc, state.attachment); - } - synchronized (state) { - state.completionDone = true; - if (notify) { - state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE; - state.notify(); - } - } - } - } - } } diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index 8f2de8d..3c166b9 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -29,15 +29,11 @@ import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; -import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; -import java.nio.channels.ReadPendingException; -import java.nio.channels.WritePendingException; import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -466,13 +462,11 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS private SendfileData sendfileData = null; private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler; - private final Semaphore readPending = new Semaphore(1); private boolean readInterest = false; // Guarded by readCompletionHandler private boolean readNotify = false; private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler; private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler; - private final Semaphore writePending = new Semaphore(1); private boolean writeInterest = false; // Guarded by writeCompletionHandler private boolean writeNotify = false; private volatile boolean closed = false; @@ -951,63 +945,42 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS return getEndpoint().getUseAsyncIO(); } - /** - * Internal state tracker for scatter/gather operations. - */ - protected class OperationState<A> implements Runnable { - protected final boolean read; - protected final ByteBuffer[] buffers; - protected final int offset; - protected final int length; - protected final A attachment; - protected final long timeout; - protected final TimeUnit unit; - protected final BlockingMode block; - protected final CompletionCheck check; - protected final CompletionHandler<Long, ? super A> handler; - protected final Semaphore semaphore; - protected final VectoredIOCompletionHandler<A> completion; - protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + @Override + public boolean needSemaphores() { + return true; + } + + @Override + public boolean hasPerOperationTimeout() { + return false; + } + + @Override + protected <A> OperationState<A> newOperationState(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { + return new Nio2OperationState<A>(read, buffers, offset, length, block, + timeout, unit, attachment, check, handler, semaphore, completion); + } + + private class Nio2OperationState<A> extends OperationState<A> { + private Nio2OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler, Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { - this.read = read; - this.buffers = buffers; - this.offset = offset; - this.length = length; - this.block = block; - this.timeout = timeout; - this.unit = unit; - this.attachment = attachment; - this.check = check; - this.handler = handler; - this.semaphore = semaphore; - this.completion = completion; - } - protected volatile long nBytes = 0; - protected volatile CompletionState state = CompletionState.PENDING; - - public boolean isInline() { - return Nio2Endpoint.isInline(); + super(read, buffers, offset, length, block, + timeout, unit, attachment, check, handler, semaphore, completion); } - public boolean process() { - try { - getEndpoint().getExecutor().execute(this); - } catch (RejectedExecutionException ree) { - log.warn(sm.getString("endpoint.executor.fail", Nio2SocketWrapper.this) , ree); - return false; - } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - // This means we got an OOM or similar creating a thread, or that - // the pool and its queue are full - log.error(sm.getString("endpoint.process.fail"), t); - return false; - } - return true; + @Override + protected boolean isInline() { + return Nio2Endpoint.isInline(); } - public void start() { + @Override + protected void start() { if (read) { // Disable any regular read notifications caused by registerReadInterest readNotify = true; @@ -1051,7 +1024,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } @Override public void failed(Throwable exc, Void attachment) { - handler.failed(exc, OperationState.this.attachment); + handler.failed(exc, Nio2OperationState.this.attachment); } }); return; @@ -1062,143 +1035,6 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } } - @Override - public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); - } - - @Override - public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); - } - - private <A> CompletionState readOrWrite(boolean read, - ByteBuffer[] buffers, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - IOException ioe = getError(); - if (ioe != null) { - handler.failed(ioe, attachment); - return CompletionState.ERROR; - } - if (timeout == -1) { - timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); - } - if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { - try { - if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } else { - if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { - if (block == BlockingMode.NON_BLOCK) { - return CompletionState.NOT_DONE; - } else { - handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); - return CompletionState.ERROR; - } - } - } - VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); - OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, timeout, unit, - attachment, check, handler, read ? readPending : writePending, completion); - state.start(); - if (block == BlockingMode.BLOCK) { - synchronized (state) { - if (state.state == CompletionState.PENDING) { - try { - state.wait(unit.toMillis(timeout)); - if (state.state == CompletionState.PENDING) { - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } - } - } - return state.state; - } - - private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { - @Override - public void completed(Long nBytes, OperationState<A> state) { - if (nBytes.longValue() < 0) { - failed(new EOFException(), state); - } else { - state.nBytes += nBytes.longValue(); - CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE; - boolean complete = true; - boolean completion = true; - if (state.check != null) { - CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length); - if (call == CompletionHandlerCall.CONTINUE) { - complete = false; - } else if (call == CompletionHandlerCall.NONE) { - completion = false; - } - } - if (complete) { - boolean notify = false; - state.semaphore.release(); - if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { - notify = true; - } else { - state.state = currentState; - } - if (completion && state.handler != null) { - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); - } - if (notify) { - synchronized (state) { - state.state = currentState; - state.notify(); - } - } - } else { - state.run(); - } - } - } - @Override - public void failed(Throwable exc, OperationState<A> state) { - IOException ioe = null; - if (exc instanceof InterruptedByTimeoutException) { - ioe = new SocketTimeoutException(); - exc = ioe; - } else if (exc instanceof IOException) { - ioe = (IOException) exc; - } - setError(ioe); - boolean notify = false; - state.semaphore.release(); - if (state.block == BlockingMode.BLOCK) { - notify = true; - } else { - state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE; - } - if (state.handler != null) { - state.handler.failed(exc, state.attachment); - } - if (notify) { - synchronized (state) { - state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE; - state.notify(); - } - } - } - } - /* Callers of this method must: * - have acquired the readPending semaphore * - have acquired a lock on readCompletionHandler diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index a6f0f62..bdebc73 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -29,19 +29,15 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.Channel; import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; -import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; -import java.nio.channels.ReadPendingException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; -import java.nio.channels.WritePendingException; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1045,10 +1041,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> private int interestOps = 0; private CountDownLatch readLatch = null; private CountDownLatch writeLatch = null; - private final Semaphore readPending; - private OperationState<?> readOperation = null; - private final Semaphore writePending; - private OperationState<?> writeOperation = null; private volatile SendfileData sendfileData = null; private volatile long lastRead = System.currentTimeMillis(); private volatile long lastWrite = lastRead; @@ -1056,13 +1048,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) { super(channel, endpoint); - if (endpoint.getUseAsyncIO()) { - readPending = new Semaphore(1); - writePending = new Semaphore(1); - } else { - readPending = null; - writePending = null; - } pool = endpoint.getSelectorPool(); socketBufferHandler = channel.getBufHandler(); } @@ -1431,59 +1416,32 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } @Override - public boolean hasAsyncIO() { - // The semaphores are only created if async IO is enabled - return (readPending != null); + protected <A> OperationState<A> newOperationState(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { + return new NioOperationState<A>(read, buffers, offset, length, block, + timeout, unit, attachment, check, handler, semaphore, completion); } - /** - * Internal state tracker for scatter/gather operations. - */ - private class OperationState<A> implements Runnable { - private final boolean read; - private final ByteBuffer[] buffers; - private final int offset; - private final int length; - private final A attachment; - private final BlockingMode block; - private final CompletionCheck check; - private final CompletionHandler<Long, ? super A> handler; - private final Semaphore semaphore; - private final VectoredIOCompletionHandler<A> completion; - private OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, - BlockingMode block, A attachment, CompletionCheck check, + private class NioOperationState<A> extends OperationState<A> { + private NioOperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler, Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { - this.read = read; - this.buffers = buffers; - this.offset = offset; - this.length = length; - this.block = block; - this.attachment = attachment; - this.check = check; - this.handler = handler; - this.semaphore = semaphore; - this.completion = completion; + super(read, buffers, offset, length, block, + timeout, unit, attachment, check, handler, semaphore, completion); } - private volatile boolean inline = true; - private volatile long nBytes = 0; - private volatile CompletionState state = CompletionState.PENDING; - private boolean completionDone = true; - public boolean process() { - try { - getEndpoint().getExecutor().execute(this); - } catch (RejectedExecutionException ree) { - log.warn(sm.getString("endpoint.executor.fail", NioSocketWrapper.this) , ree); - return false; - } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - // This means we got an OOM or similar creating a thread, or that - // the pool and its queue are full - log.error(sm.getString("endpoint.process.fail"), t); - return false; - } - return true; + @Override + protected boolean isInline() { + return inline; + } + + @Override + protected void start() { + run(); } @Override @@ -1560,169 +1518,6 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } - @Override - public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); - } - - @Override - public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); - } - - private <A> CompletionState readOrWrite(boolean read, - ByteBuffer[] buffers, int offset, int length, - BlockingMode block, long timeout, TimeUnit unit, A attachment, - CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - IOException ioe = getError(); - if (ioe != null) { - handler.failed(ioe, attachment); - return CompletionState.ERROR; - } - if (timeout == -1) { - timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); - } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) { - if (read) { - setReadTimeout(unit.toMillis(timeout)); - } else { - setWriteTimeout(unit.toMillis(timeout)); - } - } - if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { - try { - if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } else { - if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { - if (block == BlockingMode.NON_BLOCK) { - return CompletionState.NOT_DONE; - } else { - handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); - return CompletionState.ERROR; - } - } - } - VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); - OperationState<A> state = new OperationState<>(read, buffers, offset, length, block, - attachment, check, handler, read ? readPending : writePending, completion); - if (read) { - readOperation = state; - } else { - writeOperation = state; - } - state.run(); - if (block == BlockingMode.BLOCK) { - synchronized (state) { - if (state.state == CompletionState.PENDING) { - try { - state.wait(unit.toMillis(timeout)); - if (state.state == CompletionState.PENDING) { - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } - } - } - return state.state; - } - - private class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { - @Override - public void completed(Long nBytes, OperationState<A> state) { - if (nBytes.longValue() < 0) { - failed(new EOFException(), state); - } else { - state.nBytes += nBytes.longValue(); - CompletionState currentState = state.inline ? CompletionState.INLINE : CompletionState.DONE; - boolean complete = true; - boolean completion = true; - if (state.check != null) { - CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length); - if (call == CompletionHandlerCall.CONTINUE) { - complete = false; - } else if (call == CompletionHandlerCall.NONE) { - completion = false; - } - } - if (complete) { - boolean notify = false; - state.semaphore.release(); - if (state.read) { - readOperation = null; - } else { - writeOperation = null; - } - if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { - notify = true; - } else { - state.state = currentState; - } - if (completion && state.handler != null) { - state.handler.completed(Long.valueOf(state.nBytes), state.attachment); - } - synchronized (state) { - state.completionDone = true; - if (notify) { - state.state = currentState; - state.notify(); - } - } - } else { - synchronized (state) { - state.completionDone = true; - } - state.run(); - } - } - } - @Override - public void failed(Throwable exc, OperationState<A> state) { - IOException ioe = null; - if (exc instanceof InterruptedByTimeoutException) { - ioe = new SocketTimeoutException(); - exc = ioe; - } else if (exc instanceof IOException) { - ioe = (IOException) exc; - } - setError(ioe); - boolean notify = false; - state.semaphore.release(); - if (state.read) { - readOperation = null; - } else { - writeOperation = null; - } - if (state.block == BlockingMode.BLOCK) { - notify = true; - } else { - state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE; - } - if (state.handler != null) { - state.handler.failed(exc, state.attachment); - } - synchronized (state) { - state.completionDone = true; - if (notify) { - state.state = state.inline ? CompletionState.ERROR : CompletionState.DONE; - state.notify(); - } - } - } - } - } diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java index b09284a..60e383d 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java +++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java @@ -16,11 +16,17 @@ */ package org.apache.tomcat.util.net; +import java.io.EOFException; import java.io.IOException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; +import java.nio.channels.InterruptedByTimeoutException; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -28,6 +34,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.res.StringManager; public abstract class SocketWrapperBase<E> { @@ -93,12 +100,24 @@ public abstract class SocketWrapperBase<E> { */ protected final WriteBuffer nonBlockingWriteBuffer = new WriteBuffer(bufferedWriteSize); + protected final Semaphore readPending; + protected OperationState<?> readOperation = null; + protected final Semaphore writePending; + protected OperationState<?> writeOperation = null; + public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) { this.socket = socket; this.endpoint = endpoint; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.blockingStatusReadLock = lock.readLock(); this.blockingStatusWriteLock = lock.writeLock(); + if (endpoint.getUseAsyncIO() || needSemaphores()) { + readPending = new Semaphore(1); + writePending = new Semaphore(1); + } else { + readPending = null; + writePending = null; + } } public E getSocket() { @@ -952,12 +971,191 @@ public abstract class SocketWrapperBase<E> { public static final CompletionCheck COMPLETE_READ = COMPLETE_WRITE; /** + * Internal state tracker for vectored operations. + */ + protected abstract class OperationState<A> implements Runnable { + protected final boolean read; + protected final ByteBuffer[] buffers; + protected final int offset; + protected final int length; + protected final A attachment; + protected final long timeout; + protected final TimeUnit unit; + protected final BlockingMode block; + protected final CompletionCheck check; + protected final CompletionHandler<Long, ? super A> handler; + protected final Semaphore semaphore; + protected final VectoredIOCompletionHandler<A> completion; + protected OperationState(boolean read, ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore, VectoredIOCompletionHandler<A> completion) { + this.read = read; + this.buffers = buffers; + this.offset = offset; + this.length = length; + this.block = block; + this.timeout = timeout; + this.unit = unit; + this.attachment = attachment; + this.check = check; + this.handler = handler; + this.semaphore = semaphore; + this.completion = completion; + } + protected volatile long nBytes = 0; + protected volatile CompletionState state = CompletionState.PENDING; + protected volatile boolean inline = true; + protected boolean completionDone = true; + + /** + * @return true if the operation is still inline, false if the operation + * is running on a thread that is not the original caller + */ + protected abstract boolean isInline(); + + /** + * Process the operation using the connector executor. + * @return true if the operation was accepted, false if the executor + * rejected execurtion + */ + protected boolean process() { + try { + getEndpoint().getExecutor().execute(this); + } catch (RejectedExecutionException ree) { + log.warn(sm.getString("endpoint.executor.fail", SocketWrapperBase.this) , ree); + return false; + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + + /** + * Start the operation, this will typically call run. + */ + protected abstract void start(); + + } + + /** + * Completion handler for vectored operations. This will check the completion of the operation, + * then either continue or call the user provided completion handler. + */ + protected class VectoredIOCompletionHandler<A> implements CompletionHandler<Long, OperationState<A>> { + @Override + public void completed(Long nBytes, OperationState<A> state) { + if (nBytes.longValue() < 0) { + failed(new EOFException(), state); + } else { + state.nBytes += nBytes.longValue(); + CompletionState currentState = state.isInline() ? CompletionState.INLINE : CompletionState.DONE; + boolean complete = true; + boolean completion = true; + if (state.check != null) { + CompletionHandlerCall call = state.check.callHandler(currentState, state.buffers, state.offset, state.length); + if (call == CompletionHandlerCall.CONTINUE) { + complete = false; + } else if (call == CompletionHandlerCall.NONE) { + completion = false; + } + } + if (complete) { + boolean notify = false; + state.semaphore.release(); + if (state.read) { + readOperation = null; + } else { + writeOperation = null; + } + if (state.block == BlockingMode.BLOCK && currentState != CompletionState.INLINE) { + notify = true; + } else { + state.state = currentState; + } + if (completion && state.handler != null) { + state.handler.completed(Long.valueOf(state.nBytes), state.attachment); + } + synchronized (state) { + state.completionDone = true; + if (notify) { + state.state = currentState; + state.notify(); + } + } + } else { + synchronized (state) { + state.completionDone = true; + } + state.run(); + } + } + } + @Override + public void failed(Throwable exc, OperationState<A> state) { + IOException ioe = null; + if (exc instanceof InterruptedByTimeoutException) { + ioe = new SocketTimeoutException(); + exc = ioe; + } else if (exc instanceof IOException) { + ioe = (IOException) exc; + } + setError(ioe); + boolean notify = false; + state.semaphore.release(); + if (state.read) { + readOperation = null; + } else { + writeOperation = null; + } + if (state.block == BlockingMode.BLOCK) { + notify = true; + } else { + state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE; + } + if (state.handler != null) { + state.handler.failed(exc, state.attachment); + } + synchronized (state) { + state.completionDone = true; + if (notify) { + state.state = state.isInline() ? CompletionState.ERROR : CompletionState.DONE; + state.notify(); + } + } + } + } + + /** * Allows using NIO2 style read/write only for connectors that can * efficiently support it. * * @return This default implementation always returns {@code false} */ public boolean hasAsyncIO() { + // The semaphores are only created if async IO is enabled + return (readPending != null); + } + + /** + * Allows indicating if the connector needs semaphores. + * + * @return This default implementation always returns {@code false} + */ + public boolean needSemaphores() { + return false; + } + + /** + * Allows indicating if the connector supports per operation timeout. + * + * @return This default implementation always returns {@code false} + */ + public boolean hasPerOperationTimeout() { return false; } @@ -1086,7 +1284,7 @@ public abstract class SocketWrapperBase<E> { public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - throw new UnsupportedOperationException(); + return vectoredOperation(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); } /** @@ -1169,12 +1367,111 @@ public abstract class SocketWrapperBase<E> { public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler<Long, ? super A> handler) { - throw new UnsupportedOperationException(); + return vectoredOperation(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); + } + + + /** + * Vectored operation. The completion handler will be called once + * the operation is complete or an error occurred. If a CompletionCheck + * object has been provided, the completion handler will only be + * called if the callHandler method returned true. If no + * CompletionCheck object has been provided, the default NIO2 + * behavior is used: the completion handler will be called, even + * if the operation is incomplete, or if the operation completed inline. + * + * @param read true if the operation is a read, false if it is a write + * @param buffers buffers + * @param offset in the buffer array + * @param length in the buffer array + * @param block is the blocking mode that will be used for this operation + * @param timeout timeout duration for the write + * @param unit units for the timeout duration + * @param attachment an object to attach to the I/O operation that will be + * used when calling the completion handler + * @param check for the IO operation completion + * @param handler to call when the IO is complete + * @param <A> The attachment type + * @return the completion state (done, done inline, or still pending) + */ + protected <A> CompletionState vectoredOperation(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler) { + IOException ioe = getError(); + if (ioe != null) { + handler.failed(ioe, attachment); + return CompletionState.ERROR; + } + if (timeout == -1) { + timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); + } else if (!hasPerOperationTimeout() && (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout()))) { + if (read) { + setReadTimeout(unit.toMillis(timeout)); + } else { + setWriteTimeout(unit.toMillis(timeout)); + } + } + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { + try { + if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(e, attachment); + return CompletionState.ERROR; + } + } else { + if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); + return CompletionState.ERROR; + } + } + } + VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); + OperationState<A> state = newOperationState(read, buffers, offset, length, block, timeout, unit, + attachment, check, handler, read ? readPending : writePending, completion); + if (read) { + readOperation = state; + } else { + writeOperation = state; + } + state.start(); + if (block == BlockingMode.BLOCK) { + synchronized (state) { + if (state.state == CompletionState.PENDING) { + try { + state.wait(unit.toMillis(timeout)); + if (state.state == CompletionState.PENDING) { + return CompletionState.ERROR; + } + } catch (InterruptedException e) { + handler.failed(new SocketTimeoutException(), attachment); + return CompletionState.ERROR; + } + } + } + } + return state.state; } + protected abstract <A> OperationState<A> newOperationState(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler<Long, ? super A> handler, + Semaphore semaphore, VectoredIOCompletionHandler<A> completion); // --------------------------------------------------------- Utility methods + protected static long toTimeout(long timeout) { + // Many calls can't do infinite timeout so use Long.MAX_VALUE if timeout is <= 0 + return (timeout > 0) ? timeout : Long.MAX_VALUE; + } + protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) { int max = Math.min(length, to.remaining()); if (max > 0) { diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index 1bbaa2f..459c293 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -108,6 +108,10 @@ <fix> Avoid blocking write of internal buffer when using async IO. (remm) </fix> + <scode> + Refactor async IO implementation to the <code>SocketWrapperBase</code>. + (remm) + </scode> </changelog> </subsection> <subsection name="Other"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org