This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f8ff786 [SPARK-37984][SHUFFLE] Avoid calculating all outstanding
requests to improve performance
f8ff786 is described below
commit f8ff7863e792b833afb2ff603878f29d4a9888e6
Author: weixiuli <[email protected]>
AuthorDate: Sun Jan 23 20:23:20 2022 -0600
[SPARK-37984][SHUFFLE] Avoid calculating all outstanding requests to
improve performance
### What changes were proposed in this pull request?
Avoid calculating all outstanding requests to improve performance.
### Why are the changes needed?
Follow the comment
(https://github.com/apache/spark/pull/34711#pullrequestreview-835520984) , we
can implement a "has outstanding requests" method in the response handler that
doesn't even need to get a count,let's do this with PR.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Exist unittests.
Closes #35276 from weixiuli/SPARK-37984.
Authored-by: weixiuli <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../apache/spark/network/client/TransportResponseHandler.java | 10 ++++++++--
.../apache/spark/network/server/TransportChannelHandler.java | 3 +--
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 576c088..261f205 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -140,7 +140,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
@Override
public void channelInactive() {
- if (numOutstandingRequests() > 0) {
+ if (hasOutstandingRequests()) {
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {}
is closed",
numOutstandingRequests(), remoteAddress);
@@ -150,7 +150,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
@Override
public void exceptionCaught(Throwable cause) {
- if (numOutstandingRequests() > 0) {
+ if (hasOutstandingRequests()) {
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {}
is closed",
numOutstandingRequests(), remoteAddress);
@@ -275,6 +275,12 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
(streamActive ? 1 : 0);
}
+ /** Check if there are any outstanding requests (fetch requests + rpcs) */
+ public Boolean hasOutstandingRequests() {
+ return streamActive || !outstandingFetches.isEmpty() ||
!outstandingRpcs.isEmpty() ||
+ !streamCallbacks.isEmpty();
+ }
+
/** Returns the time in nanoseconds of when the last request was sent out. */
public long getTimeOfLastRequestNs() {
return timeOfLastRequestNs.get();
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index 275e64e..d197032 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -161,8 +161,7 @@ public class TransportChannelHandler extends
SimpleChannelInboundHandler<Message
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() >
requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
- boolean hasInFlightRequests =
responseHandler.numOutstandingRequests() > 0;
- if (hasInFlightRequests) {
+ if (responseHandler.hasOutstandingRequests()) {
String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while
there are outstanding " +
"requests. Assuming connection is dead; please adjust" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]