Author: remm
Date: Tue Mar 28 08:15:05 2017
New Revision: 1789065
URL: http://svn.apache.org/viewvc?rev=1789065&view=rev
Log:
Better blocking for async IO.
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1789065&r1=1789064&r2=1789065&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Tue Mar 28
08:15:05 2017
@@ -852,14 +852,16 @@ public class Nio2Endpoint extends Abstra
private final A attachment;
private final long timeout;
private final TimeUnit unit;
+ private final BlockingMode block;
private final CompletionCheck check;
private final CompletionHandler<Long, ? super A> handler;
private OperationState(ByteBuffer[] buffers, int offset, int
length,
- long timeout, TimeUnit unit, A attachment, CompletionCheck
check,
- CompletionHandler<Long, ? super A> handler) {
+ BlockingMode block, long timeout, TimeUnit unit, A
attachment,
+ CompletionCheck check, CompletionHandler<Long, ? super A>
handler) {
this.buffers = buffers;
this.offset = offset;
this.length = length;
+ this.block = block;
this.timeout = timeout;
this.unit = unit;
this.attachment = attachment;
@@ -894,7 +896,14 @@ public class Nio2Endpoint extends Abstra
}
if (complete) {
readPending.release();
- state.state = currentState;
+ if (state.block == BlockingMode.BLOCK && currentState
!= CompletionState.INLINE) {
+ synchronized (this) {
+ state.state = currentState;
+ notify();
+ }
+ } else {
+ state.state = currentState;
+ }
if (completion && state.handler != null) {
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
}
@@ -914,11 +923,18 @@ public class Nio2Endpoint extends Abstra
}
setError(ioe);
readPending.release();
+ if (state.block == BlockingMode.BLOCK) {
+ synchronized (this) {
+ state.state = Nio2Endpoint.isInline() ?
CompletionState.ERROR : CompletionState.DONE;
+ notify();
+ }
+ } else {
+ state.state = Nio2Endpoint.isInline() ?
CompletionState.ERROR : CompletionState.DONE;
+ }
if (exc instanceof AsynchronousCloseException) {
// If already closed, don't call onError and close again
return;
}
- state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR
: CompletionState.DONE;
if (state.handler != null) {
state.handler.failed(ioe, state.attachment);
}
@@ -949,7 +965,14 @@ public class Nio2Endpoint extends Abstra
}
if (complete) {
writePending.release();
- state.state = currentState;
+ if (state.block == BlockingMode.BLOCK && currentState
!= CompletionState.INLINE) {
+ synchronized (this) {
+ state.state = currentState;
+ notify();
+ }
+ } else {
+ state.state = currentState;
+ }
if (completion && state.handler != null) {
state.handler.completed(Long.valueOf(state.nBytes), state.attachment);
}
@@ -969,7 +992,14 @@ public class Nio2Endpoint extends Abstra
}
setError(ioe);
writePending.release();
- state.state = Nio2Endpoint.isInline() ? CompletionState.ERROR
: CompletionState.DONE;
+ if (state.block == BlockingMode.BLOCK) {
+ synchronized (this) {
+ state.state = Nio2Endpoint.isInline() ?
CompletionState.ERROR : CompletionState.DONE;
+ notify();
+ }
+ } else {
+ state.state = Nio2Endpoint.isInline() ?
CompletionState.ERROR : CompletionState.DONE;
+ }
if (state.handler != null) {
state.handler.failed(ioe, state.attachment);
}
@@ -995,14 +1025,25 @@ public class Nio2Endpoint extends Abstra
return CompletionState.NOT_DONE;
}
}
- OperationState<A> state = new OperationState<>(dsts, offset,
length, timeout, unit, attachment, check, handler);
+ OperationState<A> state = new OperationState<>(dsts, offset,
length, block, timeout, unit, attachment, check, handler);
+ ScatterReadCompletionHandler<A> completion = new
ScatterReadCompletionHandler<>();
Nio2Endpoint.startInline();
- getSocket().read(dsts, offset, length, timeout, unit, state, new
ScatterReadCompletionHandler<>());
+ getSocket().read(dsts, offset, length, timeout, unit, state,
completion);
Nio2Endpoint.endInline();
- if (block == BlockingMode.BLOCK && state.state ==
CompletionState.PENDING) {
- if (!awaitReadComplete(timeout, unit)) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
+ if (block == BlockingMode.BLOCK) {
+ synchronized (completion) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ completion.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ }
}
}
return state.state;
@@ -1016,9 +1057,14 @@ public class Nio2Endpoint extends Abstra
}
@Override
- public <A> CompletionState write(ByteBuffer[] srcs, int offset, int
length,
+ public <A> CompletionState write(ByteBuffer[] srcs, int offset, int
length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A>
handler) {
+ IOException ioe = getError();
+ if (ioe != null) {
+ handler.failed(ioe, attachment);
+ return CompletionState.ERROR;
+ }
if (block != BlockingMode.NON_BLOCK) {
try {
if (!writePending.tryAcquire(timeout, unit)) {
@@ -1034,14 +1080,25 @@ public class Nio2Endpoint extends Abstra
return CompletionState.NOT_DONE;
}
}
- OperationState<A> state = new OperationState<>(srcs, offset,
length, timeout, unit, attachment, check, handler);
+ OperationState<A> state = new OperationState<>(srcs, offset,
length, block, timeout, unit, attachment, check, handler);
+ GatherWriteCompletionHandler<A> completion = new
GatherWriteCompletionHandler<>();
Nio2Endpoint.startInline();
- getSocket().write(srcs, offset, length, timeout, unit, state, new
GatherWriteCompletionHandler<>());
+ getSocket().write(srcs, offset, length, timeout, unit, state,
completion);
Nio2Endpoint.endInline();
- if (block == BlockingMode.BLOCK && state.state ==
CompletionState.PENDING) {
- if (!awaitWriteComplete(timeout, unit)) {
- handler.failed(new SocketTimeoutException(), attachment);
- return CompletionState.ERROR;
+ if (block == BlockingMode.BLOCK) {
+ synchronized (completion) {
+ if (state.state == CompletionState.PENDING) {
+ try {
+ completion.wait(unit.toMillis(timeout));
+ if (state.state == CompletionState.PENDING) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ } catch (InterruptedException e) {
+ handler.failed(new SocketTimeoutException(),
attachment);
+ return CompletionState.ERROR;
+ }
+ }
}
}
return state.state;
@@ -1282,11 +1339,13 @@ public class Nio2Endpoint extends Abstra
try {
if (readPending.tryAcquire(timeout, unit)) {
readPending.release();
+ return true;
+ } else {
+ return false;
}
} catch (InterruptedException e) {
return false;
}
- return true;
}
@@ -1295,11 +1354,13 @@ public class Nio2Endpoint extends Abstra
try {
if (writePending.tryAcquire(timeout, unit)) {
writePending.release();
+ return true;
+ } else {
+ return false;
}
} catch (InterruptedException e) {
return false;
}
- return true;
}
/*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]